[pulsar-client-go] branch master updated: Fix nack backoff policy logic (#974)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 48b7d01 Fix nack backoff policy logic (#974) 48b7d01 is described below commit 48b7d0195327a81f638a3c51df2010d7eb244bbe Author: xiaolong ran AuthorDate: Fri Mar 3 11:54:38 2023 +0800 Fix nack backoff policy logic (#974) Signed-off-by: xiaolongran Motivation Currently, the NackBackoffPolicy does not take effect, because in NackBackoffPolicy, we need to use Msg object for Nack, and MsgId cannot be used for Nack, otherwise the Msg redeliverCount field cannot be obtained for backoff retry. --- pulsar/consumer_impl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d16f719..7a86574 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -587,7 +587,7 @@ func (c *consumer) Nack(msg Message) { } if mid.consumer != nil { - mid.consumer.NackID(msg.ID()) + mid.NackByMsg(msg) return } c.consumers[mid.partitionIdx].NackMsg(msg)
[pulsar-client-go] branch master updated: add messageId and topic as props of DLQ message (#907)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new cf031b8 add messageId and topic as props of DLQ message (#907) cf031b8 is described below commit cf031b8951aec1356f2e064f1bf679d24c81ccd0 Author: Garule Prabhudas AuthorDate: Tue Jan 10 17:46:33 2023 +0530 add messageId and topic as props of DLQ message (#907) Co-authored-by: Prabhudas Garule --- pulsar/consumer_test.go | 6 ++ pulsar/dlq_router.go| 12 +++- pulsar/message.go | 3 +++ pulsar/reader_test.go | 8 pulsar/retry_router.go | 1 + 5 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 11e72a7..2f1a056 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1505,6 +1505,12 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx) assert.Equal(t, []byte(expectMsg), msg.Payload()) + + // check original messageId + assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID]) + + // check original topic + assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic]) } // No more messages on the DLQ diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 000faaa..5ecd8f8 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -92,11 +92,21 @@ func (r *dlqRouter) run() { producer := r.getProducer(cm.Consumer.(*consumer).options.Schema) msg := cm.Message.(*message) msgID := msg.ID() + + // properties associated with original message + properties := msg.Properties() + + // include orinal message id in string format in properties + properties[PropertyOriginMessageID] = msgID.String() + + // include original topic name of the message in properties + properties[SysPropertyRealTopic] = msg.Topic() + producer.SendAsync(context.Background(), &ProducerMessage{ Payload: msg.Payload(), Key: msg.Key(), OrderingKey: msg.OrderingKey(), - Properties: msg.Properties(), + Properties: properties, EventTime: msg.EventTime(), ReplicationClusters: msg.replicationClusters, }, func(MessageID, *ProducerMessage, error) { diff --git a/pulsar/message.go b/pulsar/message.go index 7f2e07f..76d0176 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -154,6 +154,9 @@ type MessageID interface { // PartitionIdx returns the message partitionIdx PartitionIdx() int32 + + // String returns message id in string format + String() string } // DeserializeMessageID reconstruct a MessageID object from its serialized representation diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 07e7ed3..53bd459 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -430,6 +430,14 @@ func (id *myMessageID) PartitionIdx() int32 { return id.PartitionIdx() } +func (id *myMessageID) String() string { + mid, err := DeserializeMessageID(id.data) + if err != nil { + return "" + } + return fmt.Sprintf("%d:%d:%d", mid.LedgerID(), mid.EntryID(), mid.PartitionIdx()) +} + func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 7b5f6b8..75792ad 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -35,6 +35,7 @@ const ( SysPropertyRetryTopic = "RETRY_TOPIC" SysPropertyReconsumeTimes = "RECONSUMETIMES" SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME" + PropertyOriginMessageID= "ORIGIN_MESSAGE_ID" ) type RetryMessage struct {
[pulsar-client-go] branch master updated: fix: fix 923 (#924)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1d3499a fix: fix 923 (#924) 1d3499a is described below commit 1d3499a18d526b4b1aef0bdbbc54ac812b8ae0c0 Author: Jiaqi Shen <18863662...@163.com> AuthorDate: Fri Dec 23 15:13:04 2022 +0800 fix: fix 923 (#924) Remove the outdated interface description of SeekByTime. More details here #923. --- pulsar/consumer.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 0a515e0..0e89fc0 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -267,9 +267,6 @@ type Consumer interface { // SeekByTime resets the subscription associated with this consumer to a specific message publish time. // - // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on - // the individual partitions. - // // @param time //the message publish time when to reposition the subscription //
[pulsar-client-go] branch master updated: feat: add BackoffPolicy to `reader` and improve test case (#889)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new edfb785 feat: add BackoffPolicy to `reader` and improve test case (#889) edfb785 is described below commit edfb785961ca1f27aabea790f5642e1eaca7bd44 Author: labuladong AuthorDate: Fri Nov 11 11:04:50 2022 +0800 feat: add BackoffPolicy to `reader` and improve test case (#889) * feat: add BackoffPolicy to reader and improve test case. * improve comments * fix code style --- pulsar/consumer_test.go | 41 +++ pulsar/producer_test.go | 40 ++ pulsar/reader.go| 6 pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 75 + 5 files changed, 163 insertions(+) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f574378..e9b3013 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3262,3 +3262,44 @@ func TestAvailablePermitsLeak(t *testing.T) { assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded), "This means the resource is exhausted. consumer.Receive() will block forever.") } + +func TestConsumerWithBackoffPolicy(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + + backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + _consumer, err := client.Subscribe(ConsumerOptions{ + Topic:topicName, + SubscriptionName: "sub-1", + Type: Shared, + BackoffPolicy:backoff, + }) + assert.Nil(t, err) + defer _consumer.Close() + + partitionConsumerImp := _consumer.(*consumer).consumers[0] + // 1 s + startTime := time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 2 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionConsumerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) +} diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f193ffd..2338074 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1072,6 +1072,46 @@ func TestSendTimeout(t *testing.T) { makeHTTPCall(t, http.MethodDelete, quotaURL, "") } +func TestProducerWithBackoffPolicy(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + + backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second) + _producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + SendTimeout: 2 * time.Second, + BackoffPolicy: backoff, + }) + assert.Nil(t, err) + defer _producer.Close() + + partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer) + // 1 s + startTime := time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 2 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) + + // 4 s + startTime = time.Now() + partitionProducerImp.reconnectToBroker() + assert.True(t, backoff.IsExpectedIntervalFrom(startTime)) +} + func TestSendContextExpired(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}` diff --git a/pulsar/reader.go b/pulsar/reader.go index 3539037..e4679ab 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -20,6 +20,8 @@ package pulsar import ( "context" "time" + + "github.com/apache/pulsar-client-go/pulsar/internal" ) // ReaderMessage packages Reader and Message as a struct to use. @@ -86,6 +88,10 @@ type ReaderOptions struc
[pulsar-client-go] branch master updated: [Issue 833] Fix the availablePermits leak that could cause consumer stuck. (#835)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a013ff0 [Issue 833] Fix the availablePermits leak that could cause consumer stuck. (#835) a013ff0 is described below commit a013ff0b7353fab87a7eb7599377bb06b46eb7b7 Author: Jiaqi Shen <18863662...@163.com> AuthorDate: Thu Oct 13 16:06:01 2022 +0800 [Issue 833] Fix the availablePermits leak that could cause consumer stuck. (#835) * fix: fix for issue833 * fix: fix for issue833 by modify dispatcher() --- pulsar/consumer_partition.go | 55 + pulsar/consumer_test.go | 82 2 files changed, 122 insertions(+), 15 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 7ddff5e..5b61e7d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -82,6 +82,15 @@ const ( noMessageEntry = -1 ) +type permitsReq int32 + +const ( + // reset the availablePermits of pc + permitsReset permitsReq = iota + // increase the availablePermits + permitsInc +) + type partitionConsumerOpts struct { topic string consumerName string @@ -128,7 +137,8 @@ type partitionConsumer struct { messageCh chan ConsumerMessage // the number of message slots available - availablePermits int32 + availablePermits int32 + availablePermitsCh chan permitsReq // the size of the queue channel for buffering messages queueSize int32 @@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon dlq: dlq, metrics: metrics, schemaInfoCache: newSchemaInfoCache(client, options.topic), + availablePermitsCh: make(chan permitsReq, 10), } pc.setConsumerState(consumerInit) pc.log = client.log.SubLogger(log.Fields{ @@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() { messages = nil // reset available permits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsReset initialPermits := uint32(pc.queueSize) pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits) @@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() { messages[0] = nil messages = messages[1:] - // TODO implement a better flow controller - // send more permits if needed - pc.availablePermits++ - flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1)) - if pc.availablePermits >= flowThreshold { - availablePermits := pc.availablePermits - requestedPermits := availablePermits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsInc - pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits) - if err := pc.internalFlow(uint32(requestedPermits)); err != nil { - pc.log.WithError(err).Error("unable to send permits") - } + case pr := <-pc.availablePermitsCh: + switch pr { + case permitsInc: + pc.increasePermitsAndRequestMoreIfNeed() + case permitsReset: + pc.availablePermits = 0 } case clearQueueCb := <-pc.clearQueueCh: @@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() { messages = nil // reset available permits - pc.availablePermits = 0 + pc.availablePermitsCh <- permitsReset initialPermits := uint32(pc.queueSize) pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits) @@ -1438,6 +1444,25 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, if err != nil { pc.log.Error("Connection was closed when request ack cmd") } + pc.availablePermitsCh <- permitsInc +} + +func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() { + // TODO implement a better flow con
[pulsar-client-go] annotated tag v0.9.0 updated (dd63a4c -> 61f494f)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.9.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.9.0 was modified! *** from dd63a4c (commit) to 61f494f (tag) tagging dd63a4cc3c151cf9fdc6070ae4922966185e7c20 (commit) replaces v0.9.0-candidate-1 by xiaolongran on Sun Oct 9 10:29:51 2022 +0800 - Log - Release v0.9.0 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmNCMh8PHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjm794P/0IpCFuxnlZEVVrC5IpPtzibcw7a0hwnW7t5 jWGpDP9eFLOVZNID6hUtKEfJSe20/Cqzc4h46By5zFmCXg6flrRt9WnKgTcN/3MJ IFp2yrdSfjU17ZjFCl3wxcNP5Atlw+zwv3IEnuyGyyk88g++PoRyYwy5wysXQxjV cNvnlWJJX+J80JEGMoCQVh3RhAb2jXikmpTsycBoPFeoS1HNUqigBnB8dNLKILaf GJkT02GXuDCv+LHyQiSHnkDiTtf/MjWSLLbzo5xwr+MHFJrDYoVXjJKZ4zr9OdVo hTvlr7ZvUFAg4+MLd9Ubt5RqsREqxOkOmQq4MioTotUQDjAMj4cEIY0PlD7vcJg7 Y4TrijL99VF1tEkNEKTLyJWiaKJZWLAQUuW/v73RVTytlHpWJa2t5Szq6DZf4RU1 Tu0ANMj22Q5XIXeM2iSAiFa009q1zQPxuj7C30eJufe/2t/ebijtMvtSWIKGvQJ1 mpxJnw4YCjE/1seiToaRRrzB+c/b8OYXch6xc4vIP+Ipp/uPUp2zUR5OkYe1u8Xn ptI+G91xFsOTqgipg28ud9bqkvUkHNF0eleS8tLTgIVmae/KqjTJbQY3QlWWRGg1 FprowQYEr9+BEUDOxVOKhKtrpi6Gv8AuBrJJIDcs+B/V2GzWVA9h1lxFA55MkKj2 TUe8FYJj =ZcW4 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r57071 - in /dev/pulsar/pulsar-client-go-0.9.0-candidate-2: ./ apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc apache-pulsar-c
Author: rxl Date: Thu Sep 29 06:29:56 2022 New Revision: 57071 Log: Staging artifacts and signature for Pulsar Client Go release 0.9.0-candidate-2 Added: dev/pulsar/pulsar-client-go-0.9.0-candidate-2/ dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc Thu Sep 29 06:29:56 2022 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmM1ObIACgkQ+q+T3I6Q +oRhypQ//aTnXDBDkTB/ucCx73YqjhbRLTyZMS8w3vdaUFtXA+keYaRGAKRYifzQa +awIgQjKA7b9vStZiSi411sRhoxEIHUoatTBDefmPYESgpJlPOZZ4Bk8CyGStwPz0 +w50VqH3SdZyBmO/4JpJFUMzWPY9mUkGwqXRu01DqZez8+xe3P2e6Xn9V6pHYdIzm +zWAWcrbW7WPZxrGvwy5/LKIJrY43OHPacIlZ6DFGxY7cgNJbZfqT3cPBq2FbIyGE +lsEle4BapEDbrY6gahUzfmHVsKe9BoRmY/ycvPi6+9nTnGI6KxoR3TC9uxv//mOS +dmQBsd+YH+o+/hLoXvu9Gdpe8xTUFRYwSIC0p9b/06rtbJ2bq6wBdsd3VtUSdEXg +UKYVptI6uvXoiY9c8AqE/vBEJaMflfXQS4cQ5EwvCeIEmZ6D2bNwgv8zkCzuryq7 +PyThip6Warl4jqv5zb/sKqG74hRHIt+S+DdLkB6M7aYggf+YLCRr8Ry7sRoPcEgD +pr1Nt416n+iQ2ErykzN/Z7FqjM9tABwo5pbJxRY4M+4AQ/3YQe1Zk8dSOzZWD45X +bb5r9X464acMVDd6V6DrH9xb6QNjgSGyQQgAX5aB5CB+xUwhvC9LMbaIzzZx+D35 +O0EV8SScc7jbOHNTx/OHXZ/RYzlJGpn7QzBoR0vQx0hk0ts13aY= +=ICJv +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512 Thu Sep 29 06:29:56 2022 @@ -0,0 +1 @@ +9731d6a0615288e77feb4b73fedbbdf6d275ebefeee3cee5fc4e849f38789863f0532c7e8b93eb1e601bd98f9bb21d50a714fcf87fac9987a745a052bbe23ca3 apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz
[pulsar-client-go] annotated tag v0.9.0-candidate-2 updated (dd63a4c -> 6b9884d)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.9.0-candidate-2 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.9.0-candidate-2 was modified! *** from dd63a4c (commit) to 6b9884d (tag) tagging dd63a4cc3c151cf9fdc6070ae4922966185e7c20 (commit) replaces v0.9.0-candidate-1 by xiaolongran on Thu Sep 29 14:12:31 2022 +0800 - Log - Release v0.9.0-candidate-2 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmM1N08PHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjmaigP/AyeWAYtPyXjQ/Uf2utHfs68Ahz5QzWP6V6B vdtlKOxldF0KGR+vkSNMlb8LphazeWBAcQu4ZN2XqnwUOMfwNdNPA9nSyE+mVzSD mUFbXhizui2DmVVgqqaTFPbqSTwWYyELBTO77yBzbHQcH0U1sJnULNJlL1dfU5Dm DAk9XKEn8DCXcartKFTl8UiQ2FGp5MtncnnjvpinboElZSqmUnfXDYjCK79Dwmn4 XwpQAtPbm4fcubDJXTieWt1lDUG7KsvUNqEYuZI8VTX83+3N8NFLsy1im+iLL9aH hYxOiYbJjbdTaBgC9xLnkvRnuGzE0PxNxwOeu/S2yZI44cnXOErcX6H/DcioHB58 PsGEza5luPNFHfyi+/BiHu70YgXDmBSdybmOVXfwM+aAH6VkIF7CcdmKJUYynQud HfFU9iQhgSx0y7u5drwqTzW4/OLXNyVKGdiJVCX8z4tOQf/kHwH1Q4sH3bnjMJvB r+ILR6W038R6rN3UCUjMqDus0UCA4Ai7XaAO6ZFObSCdZms0REcG6IAfSOQLM/S+ zKr3hWQlZTGDQtje/QCtA13jbnAPcvvp50nluehiI6Vd3gvNWThOmlYPKk6dsLSt jPTPw2xjeLnbkwU9K+/05wBrLBUOyF7khGRcUf3RIjHKVkfvYI5bwA0fPEDkr3pr MuehDjL3 =T9lz -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[pulsar-client-go] branch branch-0.9.0 updated (ea6eccf -> dd63a4c)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch branch-0.9.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from ea6eccf feat: support multiple schema version for producer and consumer (#611) add 6a8e7f3 [issue #752] replace go-rate rate limiter with a buffered channel implementation (#799) add bd19458 [issue 814] consumer and producer reconnect failure metrics counter (#815) add c247619 style: fix to follow lint error (#824) add 934c867 [oauth2] Remove oauth2 go.mod and go.sum (#802) add 91b807d chore: rename test_helper to follow test code naming convention (#822) add 3d63718 Bump github.com/stretchr/testify to update gopkg.in/yaml.v3 (#813) add a09460e [client] Add MetricsRegisterer to ClientOptions (#826) add da24461 Add golang 1.19 in tests matrix (#832) add 2d5f6fc ci: add makefile (#800) add f8dc88e Make keepalive interval configurable (#838) add 68e4317 [issue #807] dlq topic producer options (#809) add fb8e801 [log-format] remove redundant "[]" pair in the head and tail of log content (#831) add 649d992 Update proto file latest (#841) add b06e198 [bugfix] Fix wrong check eventime by default (#843) add edd5c71 NackBackoffPolicy.Next return time.Duration (#834) add e78dc3c Introduce doneCh for ack error (#777) add 6a8847f Parameterize the reconnection option (#853) add 9ecebb1 add 0.9.0 release changelog (#804) add dd63a4c Embed Go SDK version to 0.9.0 (#854) No new revisions were added by this update. Summary of changes: .github/workflows/project.yml | 7 +- CHANGELOG.md |48 + Dockerfile |23 +- docker-ci.sh => Makefile |31 +- README.md |35 +- VERSION| 2 +- go.mod |10 +- go.sum |46 +- integration-tests/{ => conf}/.htpasswd | 0 integration-tests/{ => conf}/client.conf | 0 integration-tests/{ => conf}/standalone.conf | 0 oauth2/authorization_tokenretriever.go | 1 - oauth2/authorization_tokenretriever_test.go|10 +- oauth2/go.mod |12 - oauth2/go.sum | 117 - oauth2/oidc_endpoint_provider_test.go | 4 +- perf/perf-producer.go |23 +- pulsar/client.go | 8 + pulsar/client_impl.go |21 +- pulsar/consumer.go | 9 + pulsar/consumer_impl.go| 7 + pulsar/consumer_multitopic.go | 4 + pulsar/consumer_partition.go |57 +- pulsar/consumer_partition_test.go |11 +- pulsar/consumer_regex.go | 4 + pulsar/consumer_test.go|63 +- pulsar/dlq_router.go |18 +- pulsar/{test_helper.go => helper_for_test.go} | 0 pulsar/impl_message.go |11 + pulsar/internal/backoff.go |26 +- pulsar/internal/backoff_test.go| 8 +- pulsar/internal/commands.go| 1 - pulsar/internal/connection.go |18 +- pulsar/internal/connection_pool.go | 8 +- pulsar/internal/http_client.go | 2 +- pulsar/internal/lookup_service_test.go |75 +- pulsar/internal/metrics.go | 172 +- pulsar/internal/pulsar_proto/PulsarApi.pb.go | 11285 +++ pulsar/internal/pulsar_proto/PulsarApi.proto | 156 +- pulsar/internal/rpc_client.go | 2 +- pulsar/log/wrapper_logrus.go |16 +- .../namespace_name_test.go => message_test.go} |10 +- pulsar/negative_acks_tracker.go| 4 +- pulsar/negative_acks_tracker_test.go |14 +- pulsar/negative_backoff_policy.go |19 +- pulsar/negative_backoff_policy_test.go | 5 +- pulsar/producer.go | 6 + pulsar/producer_partition.go |27 +- pulsar/retry_router.go |16 +- .../pulsar-test-service-start.sh | 0 .../pulsar-test-service-stop.sh| 0 run-ci.sh => scripts/run-ci.sh | 6 +- stable.txt
[pulsar-client-go] branch master updated: Embed Go SDK version to 0.9.0 (#854)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new dd63a4c Embed Go SDK version to 0.9.0 (#854) dd63a4c is described below commit dd63a4cc3c151cf9fdc6070ae4922966185e7c20 Author: xiaolong ran AuthorDate: Thu Sep 29 14:01:36 2022 +0800 Embed Go SDK version to 0.9.0 (#854) Signed-off-by: xiaolongran Signed-off-by: xiaolongran --- pulsar/internal/connection.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 8ddbc04..4196396 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -39,8 +39,8 @@ import ( ) const ( - // TODO: Find a better way to embed the version in the library code - PulsarVersion = "0.1" + // PulsarVersion FIXME: Before each release, please modify the current Version value. + PulsarVersion = "0.9.0" ClientVersionString = "Pulsar Go " + PulsarVersion PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)
[pulsar-client-go] branch master updated: add 0.9.0 release changelog (#804)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 9ecebb1 add 0.9.0 release changelog (#804) 9ecebb1 is described below commit 9ecebb1cf92587d58bbd1ea526a92314e0dc3d91 Author: Rui Fu AuthorDate: Thu Sep 29 11:24:59 2022 +0800 add 0.9.0 release changelog (#804) --- CHANGELOG.md | 48 VERSION | 2 +- stable.txt | 2 +- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e0c97d..e68f59b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,54 @@ All notable changes to this project will be documented in this file. +[0.9.0] 2022-07-07 + +## Feature +* Add TableView support, see [PR-743](https://github.com/apache/pulsar-client-go/pull/743) +* Support ack response for Go SDK, see [PR-776](https://github.com/apache/pulsar-client-go/pull/776) +* Add basic authentication, see [PR-778](https://github.com/apache/pulsar-client-go/pull/778) +* Support multiple schema version for producer and consumer, see [PR-611](https://github.com/apache/pulsar-client-go/pull/611) +* Add schema support to Reader, see [PR-741](https://github.com/apache/pulsar-client-go/pull/741) + +## Improve +* Add consumer seek by time on partitioned topic, see [PR-782](https://github.com/apache/pulsar-client-go/pull/782) +* Fix using closed connection in consumer, see [PR-785](https://github.com/apache/pulsar-client-go/pull/785) +* Add go 1.18 to the test matrix, see [PR-790](https://github.com/apache/pulsar-client-go/pull/790) +* Schema creation and validation functions without panic, see [PR-794](https://github.com/apache/pulsar-client-go/pull/794) +* Fix ack request not set requestId when enable AckWithResponse option, see [PR-780](https://github.com/apache/pulsar-client-go/pull/780) +* Fix nil pointer dereference in TopicNameWithoutPartitionPart, see [PR-734](https://github.com/apache/pulsar-client-go/pull/734) +* Add error response for Ack func, see [PR-775](https://github.com/apache/pulsar-client-go/pull/775) +* Fix sequenceID is not equal to cause the connection to be closed incorrectly, see [PR-774](https://github.com/apache/pulsar-client-go/pull/774) +* Add consumer state check when request commands, see [PR-772](https://github.com/apache/pulsar-client-go/pull/772) +* Fix panic caused by flushing current batch with an incorrect internal function, see [PR-750](https://github.com/apache/pulsar-client-go/pull/750) +* Fix deadlock in Producer Send when message fails to encode, see [PR-762](https://github.com/apache/pulsar-client-go/pull/762) +* Add `-c/--max-connections` parameter to pulsar-perf-go and set it to 1 by default, see [PR-765](https://github.com/apache/pulsar-client-go/pull/765) +* Fix producer unable register when cnx closed, see [PR-761](https://github.com/apache/pulsar-client-go/pull/761) +* Fix annotation typo in `consumer.go`, see [PR-758](https://github.com/apache/pulsar-client-go/pull/758) +* Dlq producer on topic with schema, see [PR-723](https://github.com/apache/pulsar-client-go/pull/723) +* Add service not ready check, see [PR-757](https://github.com/apache/pulsar-client-go/pull/757) +* Fix ack timeout cause reconnect, see [PR-756](https://github.com/apache/pulsar-client-go/pull/756) +* Cleanup topics after unit tests, see [PR-755](https://github.com/apache/pulsar-client-go/pull/755) +* Allow config reader subscription name, see [PR-754](https://github.com/apache/pulsar-client-go/pull/754) +* Exposing broker metadata, see [PR-745](https://github.com/apache/pulsar-client-go/pull/745) +* Make go version consistent, see [PR-751](https://github.com/apache/pulsar-client-go/pull/751) +* Temporarily point ci to pulsar 2.8.2, see [PR-747](https://github.com/apache/pulsar-client-go/pull/747) +* Upgrade klauspost/compress to v1.14.4, see [PR-740](https://github.com/apache/pulsar-client-go/pull/740) +* Stop ticker when create producer failed, see [PR-730](https://github.com/apache/pulsar-client-go/pull/730) + +## New Contributors +* @NaraLuwan made their first contribution in https://github.com/apache/pulsar-client-go/pull/730 +* @shubham1172 made their first contribution in https://github.com/apache/pulsar-client-go/pull/735 +* @nicoloboschi made their first contribution in https://github.com/apache/pulsar-client-go/pull/738 +* @ZiyaoWei made their first contribution in https://github.com/apache/pulsar-client-go/pull/741 +* @nodece made their first contribution in https://github.com/apache/pulsar-client-go/pull/757 +* @lhotari made their first contribution in https://github.com/apache/pulsar-client-go/pull/765 +* @samuelhewitt made their first contribution in https://github.com/apache/pulsar-client-go/pull/762 +* @shileiyu made their first contribution in https
[pulsar-client-go] branch master updated (3d63718 -> a09460e)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 3d63718 Bump github.com/stretchr/testify to update gopkg.in/yaml.v3 (#813) add a09460e [client] Add MetricsRegisterer to ClientOptions (#826) No new revisions were added by this update. Summary of changes: pulsar/client.go | 5 +++ pulsar/client_impl.go | 11 - pulsar/consumer_partition_test.go | 11 +++-- pulsar/internal/lookup_service_test.go | 75 ++ pulsar/internal/metrics.go | 72 5 files changed, 107 insertions(+), 67 deletions(-)
[pulsar-client-go] branch master updated: Fix ack request not set requestId when enable AckWithResponse option (#780)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 15136a7 Fix ack request not set requestId when enable AckWithResponse option (#780) 15136a7 is described below commit 15136a706f1293615507fcbfb85163be2932bbb0 Author: liushengzhong0927 <103550934+liushengzhong0...@users.noreply.github.com> AuthorDate: Sat May 28 01:41:00 2022 +0800 Fix ack request not set requestId when enable AckWithResponse option (#780) Fixes #779 Motivation Fix bug in #779 Modifications Set requestId int the ack request command when the AckWithResponse option is enabled. --- pulsar/consumer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 06ccfd5..b7c2c8e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -534,6 +534,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { } if pc.options.ackWithResponse { + cmdAck.RequestId = proto.Uint64(reqID) _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.WithError(err).Error("Ack with response error")
[pulsar-client-go] branch master updated: Support ack response for Go SDK (#776)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c41616b Support ack response for Go SDK (#776) c41616b is described below commit c41616b2f5125603fe252a90ca004bc0bd3d76d8 Author: xiaolong ran AuthorDate: Tue May 24 14:32:05 2022 +0800 Support ack response for Go SDK (#776) * Support ack response for Go SDK Signed-off-by: xiaolongran * add test case for this change Signed-off-by: xiaolongran --- pulsar/consumer.go| 7 +++ pulsar/consumer_impl.go | 1 + pulsar/consumer_partition.go | 11 +++ pulsar/consumer_test.go | 35 +++ pulsar/internal/connection.go | 23 +++ 5 files changed, 77 insertions(+) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index dfe27c5..2df1637 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -182,6 +182,13 @@ type ConsumerOptions struct { // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` // > because we are not able to get the redeliveryCount from the message ID. NackBackoffPolicy NackBackoffPolicy + + // AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command + // is executed correctly on the Broker side. When set to true, the error information returned by the Ack + // method contains the return value of the Ack Command processed by the Broker side; when set to false, the + // error information of the Ack method only contains errors that may occur in the Go SDK's own processing. + // Default: false + AckWithResponse bool } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e36f040..e887538 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { keySharedPolicy: c.options.KeySharedPolicy, schema: c.options.Schema, decryption: c.options.Decryption, + ackWithResponse: c.options.AckWithResponse, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 30ffcb2..06ccfd5 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -104,6 +104,7 @@ type partitionConsumerOpts struct { keySharedPolicy*KeySharedPolicy schema Schema decryption *MessageDecryptionInfo + ackWithResponsebool } type partitionConsumer struct { @@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { EntryId: proto.Uint64(uint64(msgID.entryID)), } + reqID := pc.client.rpcClient.NewRequestID() cmdAck := &pb.CommandAck{ ConsumerId: proto.Uint64(pc.consumerID), MessageId: messageIDs, AckType:pb.CommandAck_Individual.Enum(), } + if pc.options.ackWithResponse { + _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), reqID, pb.BaseCommand_ACK, cmdAck) + if err != nil { + pc.log.WithError(err).Error("Ack with response error") + req.err = err + } + return + } + err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) if err != nil { pc.log.Error("Connection was closed when request ack cmd") diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index cadd8e4..0366884 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) { assert.Nil(t, checkMsg) } +func TestAckWithResponse(t *testing.T) { + now := time.Now().Unix() + topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now) + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic01, + SubscriptionName:"my-sub", + Type:Shared, +
[pulsar-client-go] branch master updated: fix nil pointer dereference in TopicNameWithoutPartitionPart (#734)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a8270eb fix nil pointer dereference in TopicNameWithoutPartitionPart (#734) a8270eb is described below commit a8270ebb7549e6206f82ffec0ea5c75d4f2f1ded Author: Jeremy AuthorDate: Tue May 24 12:09:59 2022 +0800 fix nil pointer dereference in TopicNameWithoutPartitionPart (#734) Signed-off-by: hantmac add error check Signed-off-by: hantmac --- pulsar/consumer_partition_test.go | 4 +--- pulsar/internal/metrics.go | 5 - pulsar/internal/topic_name.go | 3 +++ pulsar/internal/topic_name_test.go | 12 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index e7fcd5d..2255e52 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -21,11 +21,9 @@ import ( "sync" "testing" + "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/internal/crypto" - "github.com/stretchr/testify/assert" - - "github.com/apache/pulsar-client-go/pulsar/internal" ) func TestSingleMessageIDNoAckTracker(t *testing.T) { diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go index ec1a96e..1cab470 100644 --- a/pulsar/internal/metrics.go +++ b/pulsar/internal/metrics.go @@ -482,7 +482,10 @@ func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]str func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics { labels := make(map[string]string, 3) - tn, _ := ParseTopicName(t) + tn, err := ParseTopicName(t) + if err != nil { + return nil + } topic := TopicNameWithoutPartitionPart(tn) switch mp.metricsLevel { case 4: diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go index 86a1ebe..481e41f 100644 --- a/pulsar/internal/topic_name.go +++ b/pulsar/internal/topic_name.go @@ -107,6 +107,9 @@ func ParseTopicName(topic string) (*TopicName, error) { } func TopicNameWithoutPartitionPart(tn *TopicName) string { + if tn == nil { + return "" + } if tn.Partition < 0 { return tn.Name } diff --git a/pulsar/internal/topic_name_test.go b/pulsar/internal/topic_name_test.go index f08fcd0..ab6537b 100644 --- a/pulsar/internal/topic_name_test.go +++ b/pulsar/internal/topic_name_test.go @@ -104,20 +104,24 @@ func TestParseTopicNameErrors(t *testing.T) { func TestTopicNameWithoutPartitionPart(t *testing.T) { tests := []struct { - tn TopicName + tn *TopicName expected string }{ { - tn: TopicName{Name: "persistent://public/default/my-topic", Partition: -1}, + tn: &TopicName{Name: "persistent://public/default/my-topic", Partition: -1}, expected: "persistent://public/default/my-topic", }, { - tn: TopicName{Name: "persistent://public/default/my-topic-partition-0", Partition: 0}, + tn: &TopicName{Name: "persistent://public/default/my-topic-partition-0", Partition: 0}, expected: "persistent://public/default/my-topic", }, + { + tn: nil, + expected: "", + }, } for _, test := range tests { - assert.Equal(t, test.expected, TopicNameWithoutPartitionPart(&test.tn)) + assert.Equal(t, test.expected, TopicNameWithoutPartitionPart(test.tn)) } }
[pulsar-client-go] branch master updated: Revert "Fix stuck when reconnect broker (#703)" (#767)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 67e2075 Revert "Fix stuck when reconnect broker (#703)" (#767) 67e2075 is described below commit 67e2075c618ac967008fa713ef3e172d64e068e5 Author: Lari Hotari AuthorDate: Mon May 23 06:41:42 2022 +0300 Revert "Fix stuck when reconnect broker (#703)" (#767) This reverts commit 1a8432cfd3aa231f8eb3c97171a47eab98a8f20a. --- pulsar/internal/connection.go | 4 pulsar/internal/connection_pool.go | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 11c9d49..da9a901 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -823,8 +823,6 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) consumerID := closeConsumer.GetConsumerId() c.log.Infof("Broker notification of Closed consumer: %d", consumerID) - c.changeState(connectionClosed) - if consumer, ok := c.consumerHandler(consumerID); ok { consumer.ConnectionClosed() c.DeleteConsumeHandler(consumerID) @@ -837,8 +835,6 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) c.log.Infof("Broker notification of Closed producer: %d", closeProducer.GetProducerId()) producerID := closeProducer.GetProducerId() - c.changeState(connectionClosed) - producer, ok := c.deletePendingProducers(producerID) // did we find a producer? if ok { diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 6491abd..5ec457e 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -82,7 +82,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // current connection is closed, we need to remove the connection object from the current // connection pool and create a new connection. if conn.closed() { - p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", + p.log.Debugf("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) delete(p.connections, key) conn.Close()
[pulsar-client-go] branch master updated: Add error response for Ack func (#775)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 5108332 Add error response for Ack func (#775) 5108332 is described below commit 5108332c9dd4cb454c26804304bffb82eeffc713 Author: xiaolong ran AuthorDate: Mon May 23 11:15:27 2022 +0800 Add error response for Ack func (#775) * Add error response for Ack func Signed-off-by: xiaolongran * when connection closed we need to reconnect by using new cnx Signed-off-by: xiaolongran * fix comments Signed-off-by: xiaolongran --- pulsar/consumer.go | 4 ++-- pulsar/consumer_impl.go | 20 ++-- pulsar/consumer_multitopic.go| 17 + pulsar/consumer_partition.go | 17 +++-- pulsar/consumer_regex.go | 17 + pulsar/impl_message.go | 9 ++--- pulsar/internal/connection_pool.go | 4 +++- .../pulsartracing/consumer_interceptor_test.go | 8 ++-- 8 files changed, 56 insertions(+), 40 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index c67509b..dfe27c5 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -200,10 +200,10 @@ type Consumer interface { Chan() <-chan ConsumerMessage // Ack the consumption of a single message - Ack(Message) + Ack(Message) error // AckID the consumption of a single message, identified by its MessageID - AckID(MessageID) + AckID(MessageID) error // ReconsumeLater mark a message for redelivery after custom delay ReconsumeLater(msg Message, delay time.Duration) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2bd3ed5..e36f040 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "math/rand" "strconv" @@ -34,7 +35,7 @@ import ( const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { - AckID(id trackingMessageID) + AckID(id trackingMessageID) error NackID(id trackingMessageID) NackMsg(msg Message) } @@ -438,29 +439,28 @@ func (c *consumer) Receive(ctx context.Context) (message Message, err error) { } } -// Messages +// Chan return the message chan to users func (c *consumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message -func (c *consumer) Ack(msg Message) { - c.AckID(msg.ID()) +func (c *consumer) Ack(msg Message) error { + return c.AckID(msg.ID()) } -// Ack the consumption of a single message, identified by its MessageID -func (c *consumer) AckID(msgID MessageID) { +// AckID the consumption of a single message, identified by its MessageID +func (c *consumer) AckID(msgID MessageID) error { mid, ok := c.messageID(msgID) if !ok { - return + return errors.New("failed to convert trackingMessageID") } if mid.consumer != nil { - mid.Ack() - return + return mid.Ack() } - c.consumers[mid.partitionIdx].AckID(mid) + return c.consumers[mid.partitionIdx].AckID(mid) } // ReconsumeLater mark a message for redelivery after custom delay diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index c1cb3d8..1d75a24 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "errors" "fmt" "sync" "time" @@ -112,30 +113,30 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err } } -// Messages +// Chan return the message chan to users func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage { return c.messageCh } // Ack the consumption of a single message -func (c *multiTopicConsumer) Ack(msg Message) { - c.AckID(msg.ID()) +func (c *multiTopicConsumer) Ack(msg Message) error { + return c.AckID(msg.ID()) } -// Ack the consumption of a single message, identified by its MessageID -func (c *multiTopicConsumer) AckID(msgID MessageID) { +// AckID the consumption of a single message, identified by its MessageID +func (c *multiTopicConsumer) AckID(msgID MessageID) error { mid, ok := toTrackingMessageID(msgID) if !ok { c.log.Warnf("invalid message id type %T", msgID) -
[pulsar-client-go] branch master updated: Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new d9b1083 Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774) d9b1083 is described below commit d9b108351798569fd1c5b1c7a27a3da96fc2decd Author: xiaolong ran AuthorDate: Wed May 11 23:09:10 2022 +0800 Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774) Signed-off-by: xiaolongran ### Motivation When processing the sendReceipt command, if the sequenceID returned by the broker is greater than the sequenceID in the current pendingQueue, we need to close the current connection to fix the inconsistency between the broker and the client state. When the sequenceID returned by the broker is smaller than the sequenceID in the current pendingQueue, we do not need to close the current connection, and expect to increment the value of the returned sequenceID when the broker retries next time. The current code processing logic is just the opposite, resulting in the failure to recover after the first situation occurs, and a phenomenon similar to the following occurs: --- pulsar/producer_partition.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 87d1d69..5a2694c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -785,15 +785,15 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } if pi.sequenceID < response.GetSequenceId() { - // Ignoring the ack since it's referring to a message that has already timed out. + // Force connection closing so that messages can be re-transmitted in a new connection p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) + p._getConn().Close() return } else if pi.sequenceID > response.GetSequenceId() { - // Force connection closing so that messages can be re-transmitted in a new connection + // Ignoring the ack since it's referring to a message that has already timed out. p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) - p._getConn().Close() return } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
[pulsar-client-go] branch master updated: Add consumer state check when request commands (#772)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1e37f2f Add consumer state check when request commands (#772) 1e37f2f is described below commit 1e37f2fc93f37f39f1c1cd0fa2cba79375902f7c Author: xiaolong ran AuthorDate: Fri May 6 22:15:44 2022 +0800 Add consumer state check when request commands (#772) * Add consumer state check when request commands Signed-off-by: xiaolongran * fix a little Signed-off-by: xiaolongran --- pulsar/consumer_partition.go | 68 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b06474d..db2994f 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -18,6 +18,7 @@ package pulsar import ( + "errors" "fmt" "math" "strings" @@ -278,6 +279,10 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { } func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") + return trackingMessageID{}, errors.New("failed to redeliver closing or closed consumer") + } req := &getLastMsgIDRequest{doneCh: make(chan struct{})} pc.eventsCh <- req @@ -292,6 +297,11 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) } func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error) { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to getLastMessageID closing or closed consumer") + return trackingMessageID{}, errors.New("failed to getLastMessageID closing or closed consumer") + } + requestID := pc.client.rpcClient.NewRequestID() cmdGetLastMessageID := &pb.CommandGetLastMessageId{ RequestId: proto.Uint64(requestID), @@ -308,6 +318,10 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error } func (pc *partitionConsumer) AckID(msgID trackingMessageID) { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") + return + } if !msgID.Undefined() && msgID.ack() { pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) @@ -331,6 +345,10 @@ func (pc *partitionConsumer) NackMsg(msg Message) { } func (pc *partitionConsumer) Redeliver(msgIds []messageID) { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") + return + } pc.eventsCh <- &redeliveryRequest{msgIds} iMsgIds := make([]MessageID, len(msgIds)) @@ -341,6 +359,10 @@ func (pc *partitionConsumer) Redeliver(msgIds []messageID) { } func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { + if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { + pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") + return + } msgIds := req.msgIds pc.log.Debug("Request redelivery after negative ack for messages", msgIds) @@ -352,11 +374,14 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { } } - pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), + err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{ ConsumerId: proto.Uint64(pc.consumerID), MessageIds: msgIDDataList, }) + if err != nil { + pc.log.Error("Connection was closed when request redeliver cmd") + } } func (pc *partitionConsumer) getConsumerState() consumerState { @@ -381,6 +406,10 @@ func (pc *partitionConsumer) Close() { } func (pc *partitionConsumer)
[pulsar-client-go] branch master updated: Calling internalFlushCurrentBatch will be forwaded to internalFlushCurrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ee379ac Calling internalFlushCurrentBatch will be forwaded to internalFlushCurrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder). (#750) ee379ac is described below commit ee379ac240b2560ea43b81b5495a382c75cb2a9d Author: Karma AuthorDate: Fri May 6 21:37:07 2022 +0800 Calling internalFlushCurrentBatch will be forwaded to internalFlushCurrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder). (#750) Doing transparent forwarding could centralized the branching logic at only one place. Co-authored-by: Karma Shi --- pulsar/producer_partition.go | 33 + 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 43ae68f..87d1d69 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -386,11 +386,7 @@ func (p *partitionProducer) runEventsLoop() { return } case <-p.batchFlushTicker.C: - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + p.internalFlushCurrentBatch() } } } @@ -485,11 +481,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg.ReplicationClusters, deliverAt) if !added { // The current batch is full.. flush it and retry - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() // after flushing try again to add the current payload if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request, @@ -504,11 +497,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } if !sendAsBatch || request.flushImmediately { - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() + } } @@ -522,6 +513,11 @@ type pendingItem struct { } func (p *partitionProducer) internalFlushCurrentBatch() { + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + return + } + batchData, sequenceID, callbacks, err := p.batchBuilder.Flush() if batchData == nil { return @@ -683,11 +679,8 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - if p.batchBuilder.IsMultiBatches() { - p.internalFlushCurrentBatches() - } else { - p.internalFlushCurrentBatch() - } + + p.internalFlushCurrentBatch() pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok {
[pulsar-client-go] branch master updated: Fix producer unable register when cnx closed (#761)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new e3f625a Fix producer unable register when cnx closed (#761) e3f625a is described below commit e3f625ae8da938f5d147b3a2cadced69c07b Author: xiaolong ran AuthorDate: Wed Apr 20 21:33:07 2022 +0800 Fix producer unable register when cnx closed (#761) * Fix producer unable register when cnx closed Signed-off-by: xiaolongran * fix code style Signed-off-by: xiaolongran --- pulsar/internal/connection.go | 10 ++ pulsar/producer_partition.go | 7 +-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 6055252..11c9d49 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -53,7 +53,8 @@ type TLSOptions struct { } var ( - errConnectionClosed = errors.New("connection closed") + errConnectionClosed = errors.New("connection closed") + errUnableRegisterListener = errors.New("unable register listener when con closed") ) // ConnectionListener is a user of a connection (eg. a producer or @@ -72,7 +73,7 @@ type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error WriteData(data Buffer) - RegisterListener(id uint64, listener ConnectionListener) + RegisterListener(id uint64, listener ConnectionListener) error UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) DeleteConsumeHandler(id uint64) @@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer *pb.CommandCloseProducer) } } -func (c *connection) RegisterListener(id uint64, listener ConnectionListener) { +func (c *connection) RegisterListener(id uint64, listener ConnectionListener) error { // do not add if connection is closed if c.closed() { c.log.Warnf("Connection closed unable register listener id=%+v", id) - return + return errUnableRegisterListener } c.listenersLock.Lock() defer c.listenersLock.Unlock() c.listeners[id] = listener + return nil } func (c *connection) UnregisterListener(id uint64) { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index d37d60e..bc775e9 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error { p.sequenceIDGenerator = &nextSequenceID } p._setConn(res.Cnx) - p._getConn().RegisterListener(p.producerID, p) + err = p._getConn().RegisterListener(p.producerID, p) + if err != nil { + return err + } p.log.WithFields(log.Fields{ "cnx": res.Cnx.ID(), "epoch": atomic.LoadUint64(&p.epoch), - }).Debug("Connected producer") + }).Info("Connected producer") pendingItems := p.pendingQueue.ReadableSlice() viewSize := len(pendingItems)
[pulsar] branch master updated: Fix logger lever from warn to debug when uancked threshold is limited (#14950)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f9cfc3e7144 Fix logger lever from warn to debug when uancked threshold is limited (#14950) f9cfc3e7144 is described below commit f9cfc3e7144c75cf53672f6a66f38cac82a5104a Author: xiaolong ran AuthorDate: Thu Apr 14 21:49:15 2022 +0800 Fix logger lever from warn to debug when uancked threshold is limited (#14950) * Add rate logger for repo Signed-off-by: xiaolongran * fix log info Signed-off-by: xiaolongran * fix comments Signed-off-by: xiaolongran --- .../service/persistent/PersistentDispatcherMultipleConsumers.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0210ec699f8..fbfb09b24a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -260,8 +260,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { -log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, -totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); +if (log.isDebugEnabled()) { +log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, +totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); +} } else if (!havePendingRead) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
[pulsar-client-go] branch master updated (0b0b312 -> 5ced071)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 0b0b312 Dlq producer on topic with schema (#723) add 5ced071 fix annotation typo (#758) No new revisions were added by this update. Summary of changes: pulsar/consumer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-)
[pulsar-client-go] branch master updated: Dlq producer on topic with schema (#723)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 0b0b312 Dlq producer on topic with schema (#723) 0b0b312 is described below commit 0b0b312ceea6f77efb9058bfdac8156b959a9189 Author: Garule Prabhudas AuthorDate: Wed Apr 13 09:06:30 2022 +0530 Dlq producer on topic with schema (#723) * add schema from consumer while creating dlq producer * write test case for dlq topic with schema * since payload and schema are mutually exclusive - use payload from schema only if payload is nil * edit test Co-authored-by: PGarule --- pulsar/dlq_router.go | 6 +++--- pulsar/producer_partition.go | 12 +++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index c858946..b28600f 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -89,8 +89,7 @@ func (r *dlqRouter) run() { select { case cm := <-r.messageCh: r.log.WithField("msgID", cm.ID()).Debug("Got message for DLQ") - producer := r.getProducer() - + producer := r.getProducer(cm.Consumer.(*consumer).options.Schema) msg := cm.Message.(*message) msgID := msg.ID() producer.SendAsync(context.Background(), &ProducerMessage{ @@ -127,7 +126,7 @@ func (r *dlqRouter) close() { } } -func (r *dlqRouter) getProducer() Producer { +func (r *dlqRouter) getProducer(schema Schema) Producer { if r.producer != nil { // Producer was already initialized return r.producer @@ -140,6 +139,7 @@ func (r *dlqRouter) getProducer() Producer { Topic: r.policy.DeadLetterTopic, CompressionType: LZ4, BatchingMaxPublishDelay: 100 * time.Millisecond, + Schema: schema, }) if err != nil { diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 525b89c..d37d60e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -405,18 +405,20 @@ func (p *partitionProducer) internalSend(request *sendRequest) { msg := request.msg + // read payload from message payload := msg.Payload - var schemaPayload []byte + var err error - if p.options.Schema != nil { + + // payload and schema are mutually exclusive + // try to get payload from schema value only if payload is not set + if payload == nil && p.options.Schema != nil { + var schemaPayload []byte schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } - } - - if payload == nil { payload = schemaPayload }
[pulsar-client-go] branch master updated: Fix ack timeout cause reconnect (#756)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 85d7661 Fix ack timeout cause reconnect (#756) 85d7661 is described below commit 85d76615dea35e24e4699d6dc54be216aeb89499 Author: xiaolong ran AuthorDate: Wed Apr 6 11:36:02 2022 +0800 Fix ack timeout cause reconnect (#756) * Fix ack timeout cause reconnect Signed-off-by: xiaolongran * fix some logic Signed-off-by: xiaolongran --- pulsar/producer_partition.go | 16 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index d031e9a..525b89c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -780,22 +780,22 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if !ok { // if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs. - // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves - // the state discrepancy. - p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId()) - p._getConn().Close() + p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId()) return } if pi.sequenceID < response.GetSequenceId() { - // if we receive a receipt that is not the one expected, the state of the broker and the producer differs. - // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves - // the state discrepancy. + // Ignoring the ack since it's referring to a message that has already timed out. + p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), + response.GetSequenceId(), pi.sequenceID) + return + } else if pi.sequenceID > response.GetSequenceId() { + // Force connection closing so that messages can be re-transmitted in a new connection p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(), response.GetSequenceId(), pi.sequenceID) p._getConn().Close() return - } else if pi.sequenceID == response.GetSequenceId() { + } else { // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback p.pendingQueue.Poll()
[pulsar-client-go] branch master updated: allow config reader subscription name (#754)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 925da1a allow config reader subscription name (#754) 925da1a is described below commit 925da1a039a9551ab410727d7766c7e0a13ad69d Author: ZhangJian He AuthorDate: Tue Mar 29 14:22:48 2022 +0800 allow config reader subscription name (#754) ### Motivation allow config reader's subscription name, follow java's feature https://github.com/apache/pulsar/pull/8801 ### Modifications add param `SubscriptionName` in `ReaderOptions` ### Verifying this change add the test for setting the subscritpion name --- pulsar/reader.go | 4 pulsar/reader_impl.go | 9 ++--- pulsar/reader_test.go | 22 ++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar/reader.go b/pulsar/reader.go index f1cb575..3539037 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -68,6 +68,10 @@ type ReaderOptions struct { // SubscriptionRolePrefix sets the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string + // SubscriptionName sets the subscription name. + // If subscriptionRolePrefix is set at the same time, this configuration will prevail + SubscriptionName string + // ReadCompacted, if enabled, the reader will read messages from the compacted topic rather than reading the // full message backlog of the topic. This means that, if the topic has been compacted, the reader will only // see the latest value for each key in the topic, up until the point in the topic message backlog that has diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 596884a..dd552b6 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } } - subscriptionName := options.SubscriptionRolePrefix + subscriptionName := options.SubscriptionName if subscriptionName == "" { - subscriptionName = "reader" + subscriptionName = options.SubscriptionRolePrefix + if subscriptionName == "" { + subscriptionName = "reader" + } + subscriptionName += "-" + generateRandomName() } - subscriptionName += "-" + generateRandomName() receiverQueueSize := options.ReceiverQueueSize if receiverQueueSize <= 0 { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index aa12078..3c85871 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/crypto" + "github.com/google/uuid" "github.com/stretchr/testify/assert" ) @@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) { assert.NotNil(t, err) } +func TestReaderConfigSubscribeName(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + consumer, err := client.CreateReader(ReaderOptions{ + StartMessageID: EarliestMessageID(), + Topic:uuid.New().String(), + SubscriptionName: uuid.New().String(), + }) + if err != nil { + t.Fatal(err) + } + defer consumer.Close() + assert.NotNil(t, consumer) +} + func TestReader(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,
[pulsar-client-go] branch master updated (5c04811 -> 95fdb3f)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 5c04811 Temporarily point ci to pulsar 2.8.2 (#747) add 95fdb3f [build] make go version consistent (#751) No new revisions were added by this update. Summary of changes: Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[pulsar-client-go] branch master updated (95fdb3f -> b58f115)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 95fdb3f [build] make go version consistent (#751) add b58f115 [PIP 90] go client retrieve broker metadata (#745) No new revisions were added by this update. Summary of changes: pulsar/consumer_partition.go | 24 -- pulsar/impl_message.go | 10 + pulsar/internal/buffer.go | 6 ++ pulsar/internal/commands.go| 20 -- pulsar/internal/commands_test.go | 18 pulsar/internal/connection.go | 5 +++-- .../pulsartracing/message_carrier_util_test.go | 8 pulsar/message.go | 8 pulsar/negative_acks_tracker_test.go | 16 +++ 9 files changed, 109 insertions(+), 6 deletions(-)
[pulsar-client-go] branch master updated (b58f115 -> 4f50a67)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from b58f115 [PIP 90] go client retrieve broker metadata (#745) add 4f50a67 Add schema support to Reader (#741) No new revisions were added by this update. Summary of changes: pulsar/reader.go | 3 +++ pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 43 +++ 3 files changed, 47 insertions(+)
[pulsar-client-go] branch master updated: Add schema support to Reader (#741)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 4f50a67 Add schema support to Reader (#741) 4f50a67 is described below commit 4f50a678d9030828933e03c1a79ae310c38893ad Author: Ziyao Wei AuthorDate: Tue Mar 22 02:26:37 2022 -0400 Add schema support to Reader (#741) Add schema support to Reader --- pulsar/reader.go | 3 +++ pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 43 +++ 3 files changed, 47 insertions(+) diff --git a/pulsar/reader.go b/pulsar/reader.go index c45b8ff..f1cb575 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -79,6 +79,9 @@ type ReaderOptions struct { // Decryption represents the encryption related fields required by the reader to decrypt a message. Decryption *MessageDecryptionInfo + + // Schema represents the schema implementation. + Schema Schema } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 0fed80c..596884a 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { nackRedeliveryDelay:defaultNackRedeliveryDelay, replicateSubscriptionState: false, decryption: options.Decryption, + schema: options.Schema, } reader := &reader{ diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index bdafea0..aa12078 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -710,3 +710,46 @@ func TestProducerReaderRSAEncryption(t *testing.T) { assert.Equal(t, []byte(expectMsg), msg.Payload()) } } + +func TestReaderWithSchema(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + schema := NewStringSchema(nil) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + value := "hello pulsar" + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: value, + }) + assert.Nil(t, err) + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + Schema: schema, + }) + assert.Nil(t, err) + defer reader.Close() + + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + var res *string + err = msg.GetSchemaValue(&res) + assert.Nil(t, err) + assert.Equal(t, *res, value) +}
[pulsar-client-go] branch master updated: [PIP 90] go client retrieve broker metadata (#745)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new b58f115 [PIP 90] go client retrieve broker metadata (#745) b58f115 is described below commit b58f1157e4aadfc662c3d5fc19f2de55cc9ff411 Author: ZhangJian He AuthorDate: Tue Mar 22 12:15:14 2022 +0800 [PIP 90] go client retrieve broker metadata (#745) [PIP 90] go client retrieve broker metadata --- pulsar/consumer_partition.go | 24 -- pulsar/impl_message.go | 10 + pulsar/internal/buffer.go | 6 ++ pulsar/internal/commands.go| 20 -- pulsar/internal/commands_test.go | 18 pulsar/internal/connection.go | 5 +++-- .../pulsartracing/message_carrier_util_test.go | 8 pulsar/message.go | 8 pulsar/negative_acks_tracker_test.go | 16 +++ 9 files changed, 109 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 04a39c5..b06474d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pbMsgID := response.GetMessageId() reader := internal.NewMessageReader(headersAndPayload) + brokerMetadata, err := reader.ReadBrokerMetadata() + if err != nil { + // todo optimize use more appropriate error codes + pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_BatchDeSerializeError) + return err + } msgMeta, err := reader.ReadMessageMetadata() if err != nil { pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_ChecksumMismatch) return err } - decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta) // error decrypting the payload if err != nil { @@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.AckID(msgID) continue } - + var messageIndex *uint64 + var brokerPublishTime *time.Time + if brokerMetadata != nil { + if brokerMetadata.Index != nil { + aux := brokerMetadata.GetIndex() - uint64(numMsgs) + uint64(i) + 1 + messageIndex = &aux + } + if brokerMetadata.BrokerTimestamp != nil { + aux := timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp) + brokerPublishTime = &aux + } + } // set the consumer so we know how to ack the message id msgID.consumer = pc var msg *message @@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), orderingKey: string(smm.OrderingKey), + index: messageIndex, + brokerPublishTime: brokerPublishTime, } } else { msg = &message{ @@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicationClusters: msgMeta.GetReplicateTo(), replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), + index: messageIndex, + brokerPublishTime: brokerPublishTime, } } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index a9809aa..3216676 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -241,6 +241,8 @@ type message struct { redeliveryCount uint32 schema Schema encryptionContext *EncryptionContext + index *uint64 + brokerPublishTime *time.Time } func (msg *message) Topic() string { @@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext() *EncryptionContext { return msg.encryptionContext } +func (msg *message) Index() *uint64 { + retu
[pulsar-client-go] branch master updated: [build] make go version consistent (#751)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 95fdb3f [build] make go version consistent (#751) 95fdb3f is described below commit 95fdb3fc9ed1c4881ee874ca47e945ca15475af3 Author: ZhangJian He AuthorDate: Tue Mar 22 12:12:13 2022 +0800 [build] make go version consistent (#751) make go version consistent --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a21a219..e12cc80 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ # under the License. # -ARG GO_VERSION=golang:1.13 +ARG GO_VERSION=golang:1.15 FROM apachepulsar/pulsar:2.8.2 as pulsar FROM $GO_VERSION as go
[pulsar] branch master updated (c4e4ddd -> 601fbdd)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from c4e4ddd [Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709) add 601fbdd Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706) No new revisions were added by this update. Summary of changes: .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 2 ++ .../org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java | 10 ++ .../apache/pulsar/websocket/AbstractWebSocketHandlerTest.java | 3 ++- 3 files changed, 14 insertions(+), 1 deletion(-)
[pulsar-client-go] branch master updated: Upgrade beefsack/go-rate (#735)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 128d8ea Upgrade beefsack/go-rate (#735) 128d8ea is described below commit 128d8ea3e24035a49fc81a516b01bc57b654a64a Author: Shubham Sharma AuthorDate: Tue Feb 22 14:58:10 2022 +0530 Upgrade beefsack/go-rate (#735) Fixes #725 ### Motivation This PR will enable pulsar-client-go to be a compliant library. One of its dependencies was not compliant with OSS licensing requirements earlier, and this PR upgrades to the latest version of it, which has a MIT license. ### Modifications Ran the following commands ```bash go get github.com/beefsack/go-rate@latest go mod tidy ``` ### Verifying this change - [x] Make sure that the change passes the CI checks. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c788991..e57ac12 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/AthenZ/athenz v1.10.39 github.com/DataDog/zstd v1.5.0 github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e - github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 + github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.1 github.com/gogo/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 0e0fe1c..28bc3be 100644 --- a/go.sum +++ b/go.sum @@ -60,8 +60,8 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= -github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I= -github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= +github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM= +github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
[pulsar-client-go] annotated tag v0.8.0 updated (5daa17b -> 3df4ede)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.8.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. *** WARNING: tag v0.8.0 was modified! *** from 5daa17b (commit) to 3df4ede (tag) tagging 5daa17b02bffe7d0844c76de2306de93fe9e3acf (commit) replaces v0.7.0 by xiaolongran on Mon Feb 21 10:42:02 2022 +0800 - Log - Release v0.8.0 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmIS+/oPHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjmGD4P/2ZT37Rfxzg3XJ0wzw0VwTm/2Utf1tChRkt5 JD6CvHIt22fQgTx08Bh2q+kiEGAdrAkYe20XzSB9dVvu5jv10YnUnletqkBk9QYK Szx24cLvQjmXHJ8rUAYFyGgEeh1k3/nh1k9mrGqxAvsnegs06xtGBylHsnaVdZM7 gyBEChqyxfPdA5iZgvTsjss3dPx/puJfErpq1NQ2ng4AHImhzq9zfELUuGNtp3ce oj0HegIlajTgLp/pSMuCCCiYy7Lx63cN3zS2Y1lkQHgrjrgbzB0MjCPMHdU2JWWo 4/SVI0QHFZKOgvyZuYtSgE5zImazBdZrktunasc+w9v3u8hsXLgdT/6WaR3u8Ton vPwwcVpHaVwnRvdL1SGOpiQyUWTNPAnrP1XNF6tXJRaK3Ne9ZN2W5zT9We+dVSGx m/Fnpa1pU6iRgnbBDL/2RpuzMPpxJw9ZTYmCaAmJDhvHjCJmumovhe77qznbFDYG 0oA7opjRFp1BOm2DQUo4GQbczE7r7fJK1moCIbAsDg8K1k7JsO0nJCT0Cc3EM3B+ SXos8XmX9qnGKEJpvCUcb5foI2Du1bUSwaNhlmctbDNoGrmrn4eZ8m2aW3fwLGQt QWv6Z0duT2Pkug8T2w0OfdkqC7JD/+Jt2+XkUt+htDOgr5F3agbhVBa96wfCuidP E6mVz98Z =y/wi -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[pulsar-client-go] branch master updated: Update version file to 0.8.0 (#728)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1df5596 Update version file to 0.8.0 (#728) 1df5596 is described below commit 1df5596aa7a351c6f43edb53f54b2a14b83b0073 Author: xiaolong ran AuthorDate: Mon Feb 21 10:40:24 2022 +0800 Update version file to 0.8.0 (#728) Signed-off-by: xiaolongran ## Motivation Update version file to 0.8.0 --- VERSION| 2 +- stable.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 3129f0c..c12365b 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.7.0 +v0.8.0 diff --git a/stable.txt b/stable.txt index eb53fa6..aff2506 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.7.0 +v0.8.0
[pulsar-client-go] branch master updated: hotfix: there was a ticker leak when producer creation failed (#730)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 9615575 hotfix: there was a ticker leak when producer creation failed (#730) 9615575 is described below commit 961557532149e1d44f4c27066606d8323f172dbe Author: L AuthorDate: Mon Feb 21 10:37:50 2022 +0800 hotfix: there was a ticker leak when producer creation failed (#730) Co-authored-by: breatche Master Issue: [#729 ] ### Motivation I received an alarm indicating that the server load was too high, used the pprof tool to locate the problem and determined that the cause was a ticker leak while a large number of producer creating failed. Here are some screenshots to show it: - pprof heap, there are a number of SDK-started ticker, accounted for 90% ![image](https://user-images.githubusercontent.com/15198796/154658829-e7bfc501-31cf-4f5e-9468-7a690d6b9a23.png) - pprof cpu, cpu consumption is also here ![image](https://user-images.githubusercontent.com/15198796/154659229-3f2506e1-59fd-4ec0-b1f9-230fa4a48d96.png) ### Modifications stop ticker when create producer failed --- pulsar/producer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 0e2e4c4..d031e9a 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -145,6 +145,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } err := p.grabCnx() if err != nil { + p.batchFlushTicker.Stop() logger.WithError(err).Error("Failed to create producer at newPartitionProducer") return nil, err }
[pulsar-client-go] branch master updated: add 0.8.0 changelog for repo (#727)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 965045a add 0.8.0 changelog for repo (#727) 965045a is described below commit 965045aa0d2c077826f64d0b88a5d5e55d9d50c4 Author: xiaolong ran AuthorDate: Wed Feb 16 11:39:10 2022 +0800 add 0.8.0 changelog for repo (#727) Signed-off-by: xiaolongran Add 0.8.0 changelog for repo --- CHANGELOG.md | 47 +++ 1 file changed, 47 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e43ddc..97f6541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,53 @@ All notable changes to this project will be documented in this file. +[0.8.0] 2022-02-16 + +## What's Changed +* Update release docs with missing information by @cckellogg in https://github.com/apache/pulsar-client-go/pull/656 +* Update change log for 0.7.0 release by @cckellogg in https://github.com/apache/pulsar-client-go/pull/655 +* Update version to 0.7.0 by @cckellogg in https://github.com/apache/pulsar-client-go/pull/654 +* fix issue 650,different handle sequence value by @baomingyu in https://github.com/apache/pulsar-client-go/pull/651 +* Support nack backoff policy for SDK by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/660 +* Remove unused dependency in `oauth2` module by @reugn in https://github.com/apache/pulsar-client-go/pull/661 +* [Issue 662] Fix race in connection.go waitUntilReady() by @bschofield in https://github.com/apache/pulsar-client-go/pull/663 +* Update dependencies by @reugn in https://github.com/apache/pulsar-client-go/pull/665 +* [Issue 652] Quick fixes to the documentation for the main building blocks of the library by @reugn in https://github.com/apache/pulsar-client-go/pull/667 +* Add subscription properties for ConsumerOptions by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/671 +* Add new bug-resistant build constraints by @reugn in https://github.com/apache/pulsar-client-go/pull/670 +* Handle the parameters parsing error in NewProvider by @reugn in https://github.com/apache/pulsar-client-go/pull/673 +* Update email template of release docs by @izumo27 in https://github.com/apache/pulsar-client-go/pull/674 +* Add properties filed for batch container by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/683 +* [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method by @bschofield in https://github.com/apache/pulsar-client-go/pull/678 +* Upgrade DataDog/zstd to v1.5.0 by @dferstay in https://github.com/apache/pulsar-client-go/pull/690 +* fix:add order key to message by @leizhiyuan in https://github.com/apache/pulsar-client-go/pull/688 +* Set default go version to 1.13 in CI related files by @reugn in https://github.com/apache/pulsar-client-go/pull/692 +* [Partition Consumer] Provide lock-free access to compression providers by @dferstay in https://github.com/apache/pulsar-client-go/pull/689 +* Use a separate gorutine to handle the logic of reconnect by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/691 +* [DefaultRouter] add a parallel bench test by @dferstay in https://github.com/apache/pulsar-client-go/pull/693 +* Revert "Use a separate gorutine to handle the logic of reconnect" by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/700 +* Fix data race while accessing connection in partitionProducer by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/701 +* Fix stuck when reconnect broker by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/703 +* Fix slice bounds out of range for readSingleMessage by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/709 +* Encryption failure test case fix by @GPrabhudas in https://github.com/apache/pulsar-client-go/pull/708 +* [DefaultRouter] fix unnecessary system clock reads due to races accessing router state by @dferstay in https://github.com/apache/pulsar-client-go/pull/694 +* Fix negative WaitGroup counter issue by @wolfstudy in https://github.com/apache/pulsar-client-go/pull/712 +* [issue 675] oauth2 use golang-jwt address CVE-2020-26160 by @zzzming in https://github.com/apache/pulsar-client-go/pull/713 +* readme: add note about how to build and test by @pgier in https://github.com/apache/pulsar-client-go/pull/714 +* Bump oauth2 package version to the latest in master by @iorvd in https://github.com/apache/pulsar-client-go/pull/715 +* Fix closed connection leak by @billowqiu in https://github.com/apache/pulsar-client-go/pull/716 +* [Bugfix] producer runEventsLoop for reconnect early exit by @billowqiu in https://github.com/apache/pulsar-client-go/pull/721 +* [issue 679][oauth2] Fix macos compiler warnings by @pgier in https://github.com/apache/pulsar-client-go/pull/71
svn commit: r52577 - in /dev/pulsar/pulsar-client-go-0.8.0-candidate-1: ./ apache-pulsar-client-go-0.8.0-src.tar.gz apache-pulsar-client-go-0.8.0-src.tar.gz.asc apache-pulsar-client-go-0.8.0-src.tar.g
Author: rxl Date: Wed Feb 16 03:23:50 2022 New Revision: 52577 Log: Staging artifacts and signature for Pulsar Client Go release 0.8.0-candidate-1 Added: dev/pulsar/pulsar-client-go-0.8.0-candidate-1/ dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc Wed Feb 16 03:23:50 2022 @@ -0,0 +1,17 @@ +-BEGIN PGP SIGNATURE- + +iQJEBAABCAAuFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmIMZswQHHlvbmdAYXBh +Y2hlLm9yZwAKCRD6r5PcjpChGCr1EACfxHU+Lis8n/SJUSfuwLpHZWGzytN0xllV +zJOlYi0zaUV34BcKk7w3+pi6QTKUR6VFippDxKDkwZgrN1nhC5/MaCIEe3C/HviA +KL3TBJdFIElJ1tI3n6TfmWmRRlXhmMJiTkjnVuLXRA00d3hf/wGL2lpq/cY+CWma +rWJKoEg0MLc+svxWVzNQRZF0BL54JlSei6D/9qmDLp6skF5UyO4m+moUV9S5kj07 +/jZbgELQWSF9utwo2vzdHUd16+K99kSggmyNaZuoIROn+BAY/S300BaPR0X7hVQl +lp65EhnSf0MTdovGjADTyn+EYlw6kyIlIIc4ICNg/7uhu6Q3pGIxD83YQY1fy9p7 +/rsyFwFkrnKNbqSIGQrGkNFcNbszRfgEVOiP/Mp8Pp1p0RWL404BD/EEOtZVQcFb +Y6sBSqY6gUnzH7VTPePtbBVp0RRUABnQ1JbcDZUOwLW5mU2qUUA287f1Ow1B90hF +/ktBs4TPHmKIuJ6wdZyb10FEbvbHAEZoHGYVrMibRkBbR6eSD9NGQQy4gnWy/i8g +JoV4379VIyncuJAQfdv4O5d/sC9jksupn431s+HxMk0dsq8XyGgf19ACSxX3Zt8u +VqL8F2rCEfk9IeV4GP3M9T3TdNK/RApdAHxif1bs1oy6pNetnFzQRG73RbYYtaza +znydc5uvGg== +=N0KU +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512 Wed Feb 16 03:23:50 2022 @@ -0,0 +1 @@ +0015cc735af13139a564f0fd57c58af753579953e3c12f08cb62d8e9801cdc2964c60392ed9a98dd54845be7f7b1a353bd49cecb03360b4cb691904f8447769c apache-pulsar-client-go-0.8.0-src.tar.gz
[pulsar-client-go] annotated tag v0.8.0-candidate-1 updated (5daa17b -> 58d5097)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.8.0-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. *** WARNING: tag v0.8.0-candidate-1 was modified! *** from 5daa17b (commit) to 58d5097 (tag) tagging 5daa17b02bffe7d0844c76de2306de93fe9e3acf (commit) replaces v0.7.0 by xiaolongran on Wed Feb 16 10:37:55 2022 +0800 - Log - Release v0.2.0 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmIMY4UPHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjmJHoQAL3XJWlSSkRS8VOatPeRVt4XboPGE3khFXcx K5nywWB5XSIuiQcunO+QVl71g7EYu+f6PCUJWi8kQpl809/C+nazHjbYm04okAKd 5+mtznRjAGfIa/s4xfNnPso01AacBb6K5Q9U3pRGJbSp8QQLL/yTcYHL7OGgH80d n1uz8IQxEBHicPt9xnKMj6BhU3md4kfqvisQgqXODiSaOKpSoSE0hStHWYKK3aVh hIDCmaclOj161VF3i7+wSlKD0OknPvU5KwvUIM9IvSyfxq9Ftvywh3bylU3ZGC3S vupONpSB0XPVEgV4Wirn297FFVSUV1MNKJXW9yRiRqnqL2DjcFlaCEeO8aYxtwgP ySOTLBAS5Laec+vQISfvsASjMHhMwZaVtEtjUIrMh+XOWWiIY4ujlN33AM4Ni0Ir 1UPcvya+smmH42Q4TB9Qe0ldy6Eh6jxdQMsY2bha8SMYPtCUIc9LTIx5H3JG9mhr ouuhbQ5cU1+h9dLKGGpgkOOWoFk50JZgTgdktlV0mYPLbHIqZGLVkhwtxlqxSBJL uWMHw4gM3ACR93f8DgfatHLZa3P7HSGMQLdyaj1SK4wA46vTt6HFBqHhJlS+JFQe Z18VQV8uxdYn7Lwnzc9IuijMtxp+v+l5sjxzoKdTXiQuUlYjpahAGiNgMaqYuKDy Tl2PQh5b =fd4J -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[pulsar-client-go] branch branch-0.8.0 created (now 5daa17b)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch branch-0.8.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. at 5daa17b Markdown error fix (#722) No new revisions were added by this update.
[pulsar-client-go] branch master updated: [optimize] Stop batchFlushTicker when Disable batching (#720)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 8f8287f [optimize] Stop batchFlushTicker when Disable batching (#720) 8f8287f is described below commit 8f8287f30fc0cc90e9dea0003776491fef22eb33 Author: ZhangJian He AuthorDate: Thu Feb 10 11:43:18 2022 +0800 [optimize] Stop batchFlushTicker when Disable batching (#720) ### Motivation Disable batchFlushTicker when Disable batching, reduce cpu cost ### Modifications Stop the batchFlushTicker when Disable batching --- pulsar/producer_partition.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 913c33c..0e2e4c4 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -126,6 +126,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions metrics: metrics, epoch:0, } + if p.options.DisableBatching { + p.batchFlushTicker.Stop() + } p.setProducerState(producerInit) if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
[pulsar-client-go] branch master updated: [Bugfix] producer runEventsLoop for reconnect early exit (#721)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 16e8b81 [Bugfix] producer runEventsLoop for reconnect early exit (#721) 16e8b81 is described below commit 16e8b8114615146d645f2947afeb5f3cedbc85a8 Author: billowqiu AuthorDate: Wed Feb 9 15:04:26 2022 +0800 [Bugfix] producer runEventsLoop for reconnect early exit (#721) * Fix closed connection leak * Fix closed connection leak * bugfix: runEventsLoop for reconnect early exit * [optimize] add log when reconnect * [Bugfix]fix panic * [Bugfix]remove log conn ID * [optimize]Distinguish failed create producer log --- pulsar/consumer_partition.go | 3 +++ pulsar/internal/connection_pool.go | 6 +++--- pulsar/producer_partition.go | 21 + 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 9bd4a94..04a39c5 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() { for { select { case <-pc.closeCh: + pc.log.Info("close consumer, exit reconnect") return case <-pc.connectClosedCh: pc.log.Debug("runEventsLoop will reconnect") @@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() { for maxRetry != 0 { if pc.getConsumerState() != consumerReady { // Consumer is already closing + pc.log.Info("consumer state not ready, exit reconnect") return } @@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() { pc.log.Info("Reconnected consumer to broker") return } + pc.log.WithError(err).Error("Failed to create consumer at reconnect") errMsg := err.Error() if strings.Contains(errMsg, errTopicNotFount) { // when topic is deleted, we should give up reconnection. diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index 4787ba1..db67c25 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U p.Lock() conn, ok := p.connections[key] if ok { - p.log.Infof("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", + p.log.Debugf("Found connection in pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) // remove stale/failed connection if conn.closed() { - delete(p.connections, key) - conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) + delete(p.connections, key) + conn.Close() conn = nil // set to nil so we create a new one } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3f1e54b..913c33c 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } err := p.grabCnx() if err != nil { - logger.WithError(err).Error("Failed to create producer") + logger.WithError(err).Error("Failed to create producer at newPartitionProducer") return nil, err } @@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error { } res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { - p.log.WithError(err).Error("Failed to create producer") + p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") return err } @@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() { for maxRetry != 0 { if p.getProducerState() != producerReady { // Producer is already closing + p.log.I
[pulsar-client-go] branch master updated: Fix closed connection leak (#716)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new edf81af Fix closed connection leak (#716) edf81af is described below commit edf81af225d1573848584a7e48f090a558a9c620 Author: billowqiu AuthorDate: Sat Jan 22 11:15:48 2022 +0800 Fix closed connection leak (#716) Fix closed connection leak --- pulsar/internal/connection_pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go index cadeed1..4787ba1 100644 --- a/pulsar/internal/connection_pool.go +++ b/pulsar/internal/connection_pool.go @@ -81,6 +81,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U // remove stale/failed connection if conn.closed() { delete(p.connections, key) + conn.Close() p.log.Infof("Removed connection from pool key=%s logical_addr=%+v physical_addr=%+v", key, conn.logicalAddr, conn.physicalAddr) conn = nil // set to nil so we create a new one
[pulsar-client-go] branch master updated: Bump oauth2 package version to the latest in master (#715)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ddacb92 Bump oauth2 package version to the latest in master (#715) ddacb92 is described below commit ddacb9201e621c58488951772a74cecf78f11f54 Author: Oleksandr Prokopovych <93676586+io...@users.noreply.github.com> AuthorDate: Fri Jan 21 05:27:44 2022 +0200 Bump oauth2 package version to the latest in master (#715) * bump oauth2 package to the latest in master * bump oauth2 package to the latest in master --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index eacc490..c788991 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/AthenZ/athenz v1.10.39 github.com/DataDog/zstd v1.5.0 - github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd + github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b github.com/davecgh/go-spew v1.1.1
[pulsar-client-go] branch master updated: readme: add note about how to build and test (#714)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new f5676b9 readme: add note about how to build and test (#714) f5676b9 is described below commit f5676b940f6029cf0a34a612b1f843bf5bdac076 Author: Paul Gier AuthorDate: Thu Jan 20 03:07:49 2022 -0600 readme: add note about how to build and test (#714) readme: add note about how to build and test --- README.md | 10 ++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 89c17f2..5ce74d1 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,16 @@ for reader.HasNext() { } ``` +## Build and Test + +Build the sources: + +go build ./pulsar + +Run the unit tests: + +./docker-ci.sh + ## Contributing Contributions are welcomed and greatly appreciated. See [CONTRIBUTING.md](CONTRIBUTING.md) for details on submitting patches and the contribution workflow.
[pulsar-client-go] branch master updated: [issue 675] oauth2 use golang-jwt address CVE-2020-26160 (#713)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 25e5957 [issue 675] oauth2 use golang-jwt address CVE-2020-26160 (#713) 25e5957 is described below commit 25e59572242edd2c4aac2a836773b6f124efb7fa Author: ming AuthorDate: Thu Jan 20 04:07:17 2022 -0500 [issue 675] oauth2 use golang-jwt address CVE-2020-26160 (#713) * oauth2 use golang-jwt address CVE-2020-26160 * set go 1.15 minimum version as required by golang-jwt --- .github/workflows/go.yml | 2 +- .github/workflows/project.yml | 2 +- README.md | 2 +- docker-ci.sh | 2 +- go.mod| 2 +- go.sum| 4 ++-- oauth2/auth.go| 2 +- oauth2/go.mod | 4 ++-- oauth2/go.sum | 4 ++-- 9 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index bc64a50..a3c4c60 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: -go-version: [1.13, 1.14, 1.15, 1.16, 1.17] +go-version: [1.15, 1.16, 1.17] steps: - name: clean docker cache run: | diff --git a/.github/workflows/project.yml b/.github/workflows/project.yml index 15ecb98..810979a 100644 --- a/.github/workflows/project.yml +++ b/.github/workflows/project.yml @@ -6,7 +6,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: -go-version: [1.13, 1.14, 1.15, 1.16, 1.17] +go-version: [1.15, 1.16, 1.17] steps: - name: Set up Go uses: actions/setup-go@v1 diff --git a/README.md b/README.md index 73bd982..89c17f2 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ CGo based library. ## Requirements -- Go 1.13+ +- Go 1.15+ ## Status diff --git a/docker-ci.sh b/docker-ci.sh index 200cc50..37f97e7 100755 --- a/docker-ci.sh +++ b/docker-ci.sh @@ -25,7 +25,7 @@ cd ${SRC_DIR} IMAGE_NAME=pulsar-client-go-test:latest -GO_VERSION=${1:-1.13} +GO_VERSION=${1:-1.16} docker rmi --force ${IMAGE_NAME} || true docker rmi --force apachepulsar/pulsar:latest || true docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="golang:${GO_VERSION}" . diff --git a/go.mod b/go.mod index 29407fa..eacc490 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/apache/pulsar-client-go -go 1.13 +go 1.15 require ( github.com/AthenZ/athenz v1.10.39 diff --git a/go.sum b/go.sum index ee4a2b3..b2fdce1 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= -github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= -github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -122,6 +120,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= +github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= diff --git a/oauth2/auth.go b/oauth2/auth.go index 0a3c73a..d44bd35 100644 --- a/oauth2/auth.go +++ b/oauth2/auth.go @@ -22,7 +22,7 @@ import ( "time" "github.com/apache/pulsar-client-go/oauth2/clock" - "github.com/form3tech-oss/jwt-go" + "github.com/golang-jwt/jwt" "golang.org/x/
[pulsar-client-go] branch master updated: Fix negative WaitGroup counter issue (#712)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 90305e8 Fix negative WaitGroup counter issue (#712) 90305e8 is described below commit 90305e85a1631cca42cfa840675f52db9c773162 Author: xiaolong ran AuthorDate: Thu Jan 20 12:39:45 2022 +0800 Fix negative WaitGroup counter issue (#712) * Fix negative WaitGroup counter issue Signed-off-by: xiaolongran * use cas Signed-off-by: xiaolongran * Make sure the callback logic is atomic Signed-off-by: xiaolongran --- pulsar/consumer_partition.go | 6 ++--- pulsar/producer_partition.go | 61 +++- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 7679a8c..9bd4a94 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -33,7 +33,7 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" - "go.uber.org/atomic" + uAtomic "go.uber.org/atomic" ) var ( @@ -110,10 +110,10 @@ type partitionConsumer struct { // this is needed for sending ConsumerMessage on the messageCh parentConsumer Consumer - state atomic.Int32 + state uAtomic.Int32 options*partitionConsumerOpts - conn atomic.Value + conn uAtomic.Value topicstring name string diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index e318b81..3f1e54b 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -33,7 +33,7 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" - ua "go.uber.org/atomic" + uAtomic "go.uber.org/atomic" ) type producerState int32 @@ -60,12 +60,12 @@ var ( var errTopicNotFount = "TopicNotFound" type partitionProducer struct { - state ua.Int32 + state uAtomic.Int32 client *client topic string loglog.Logger - conn atomic.Value + conn uAtomic.Value options *ProducerOptions producerName string @@ -675,7 +675,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok { - fr.waitGroup.Done() + close(fr.doneCh) return } @@ -688,7 +688,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { // The last item in the queue has been completed while we were // looking at it. It's safe at this point to assume that every // message enqueued before Flush() was called are now persisted - fr.waitGroup.Done() + close(fr.doneCh) return } @@ -696,7 +696,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { msg: nil, callback: func(id MessageID, message *ProducerMessage, e error) { fr.err = e - fr.waitGroup.Done() + close(fr.doneCh) }, } @@ -704,19 +704,23 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { - wg := sync.WaitGroup{} - wg.Add(1) - var err error var msgID MessageID + // use atomic bool to avoid race + isDone := uAtomic.NewBool(false) + doneCh := make(chan struct{}) + p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { - err = e - msgID = ID - wg.Done() + if isDone.CAS(false, true) { + err = e + msgID = ID + close(doneCh) + } }, true) - wg.Wait() + // wait for send request to finish + <-doneCh return msgID, err } @@ -828,7 +832,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } func (p *partitionProducer) internalClose(req *closeProducer) { - defer req.waitGroup.Done() + defer close(req.doneCh) if !p.casProducerState(producerReady, producerClosing) { return } @@ -863,14 +867,15 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush
[pulsar-client-go] branch master updated: [DefaultRouter] fix unnecessary system clock reads due to races accessing router state (#694)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a119bab [DefaultRouter] fix unnecessary system clock reads due to races accessing router state (#694) a119bab is described below commit a119bab0f8598601c0eb7f0fcd97da7ab06700c7 Author: dferstay AuthorDate: Mon Jan 17 00:05:25 2022 -0800 [DefaultRouter] fix unnecessary system clock reads due to races accessing router state (#694) Previously, we used atomic operations to read and update parts of the default router state. Unfortunately, the reads and updates could race under concurrent calls leading to unnecessary clock reads and an associated slowdown in performance. Now, we use atomic addition to increment the message count and batch size. This removes the race condition by ensuring that each go-routine will have a unique messageCount, and hence only one will perform the clock read. Furthermore, we use atomic compare-and-swap to ensure that partitions are not skipped if multiple go-routines attempt to increment the partition cursor. Signed-off-by: Daniel Ferstay Co-authored-by: Daniel Ferstay --- pulsar/default_router.go | 50 +-- pulsar/default_router_test.go | 13 +++ 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/pulsar/default_router.go b/pulsar/default_router.go index b5e24a6..6945ff1 100644 --- a/pulsar/default_router.go +++ b/pulsar/default_router.go @@ -18,7 +18,6 @@ package pulsar import ( - "math" "math/rand" "sync/atomic" "time" @@ -27,7 +26,7 @@ import ( type defaultRouter struct { currentPartitionCursor uint32 - lastChangeTimestamp int64 + lastBatchTimestamp int64 msgCounter uint32 cumulativeBatchSize uint32 } @@ -45,7 +44,7 @@ func NewDefaultRouter( disableBatching bool) func(*ProducerMessage, uint32) int { state := &defaultRouter{ currentPartitionCursor: rand.Uint32(), - lastChangeTimestamp:math.MinInt64, + lastBatchTimestamp: time.Now().UnixNano(), } readClockAfterNumMessages := uint32(maxBatchingMessages / 10) @@ -75,37 +74,38 @@ func NewDefaultRouter( // If there's no key, we do round-robin across partition, sticking with a given // partition for a certain amount of messages or volume buffered or the max delay to batch is reached so that // we ensure having a decent amount of batching of the messages. - // Note that it is possible that we skip more than one partition if multiple goroutines increment - // currentPartitionCursor at the same time. If that happens it shouldn't be a problem because we only want to - // spread the data on different partitions but not necessarily in a specific sequence. var now int64 size := uint32(len(message.Payload)) - previousMessageCount := atomic.LoadUint32(&state.msgCounter) - previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize) - previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp) + partitionCursor := atomic.LoadUint32(&state.currentPartitionCursor) + messageCount := atomic.AddUint32(&state.msgCounter, 1) + batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size) - messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1) - sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize) + // Note: use greater-than for the threshold check so that we don't route this message to a new partition + // before a batch is complete. + messageCountReached := messageCount > uint32(maxBatchingMessages) + sizeReached := batchSize > uint32(maxBatchingSize) durationReached := false - if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 { + if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 { now = time.Now().UnixNano() - durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds() + lastBatchTime := atomic.LoadInt64(&state.lastBatchTimestamp) + durationReached = now-lastBatchTime > maxBatchingDelay.Nanoseconds() } if messageCountReached || s
[pulsar-client-go] branch master updated: Encryption failure test case fix (#708)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 468bfd6 Encryption failure test case fix (#708) 468bfd6 is described below commit 468bfd6bdcd45857e64fd499b98c7f30ec6f60f7 Author: Garule Prabhudas AuthorDate: Mon Jan 17 12:02:51 2022 +0530 Encryption failure test case fix (#708) * test case to detect race condition in creation of producer/consumer * fix for race condition in consumer/producer creation * refactor * restore test case Co-authored-by: PGarule --- pulsar/consumer_impl.go | 30 +++ pulsar/consumer_partition.go | 7 - pulsar/consumer_test.go | 70 +++- pulsar/producer_impl.go | 21 + pulsar/producer_partition.go | 21 - pulsar/reader_impl.go| 12 6 files changed, 132 insertions(+), 29 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 00e0b76..2bd3ed5 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -156,6 +157,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } topic = tns[0].Name + err = addMessageCryptoIfMissing(client, &options, topic) + if err != nil { + return nil, err + } return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false) } @@ -168,6 +173,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } options.Topics = distinct(options.Topics) + err = addMessageCryptoIfMissing(client, &options, options.Topics) + if err != nil { + return nil, err + } + return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq) } @@ -181,6 +191,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } + + err = addMessageCryptoIfMissing(client, &options, tn.Name) + if err != nil { + return nil, err + } + return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq) } @@ -654,3 +670,17 @@ func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) { return mid, true } + +func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error { + // decryption is enabled, use default messagecrypto if not provided + if options.Decryption != nil && options.Decryption.MessageCrypto == nil { + messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", + false, + client.log.SubLogger(log.Fields{"topic": topics})) + if err != nil { + return err + } + options.Decryption.MessageCrypto = messageCrypto + } + return nil +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1d95c42..7679a8c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -186,13 +186,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if pc.options.decryption == nil { decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor } else { - if options.decryption.MessageCrypto == nil { - messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", false, pc.log) - if err != nil { - return nil, err - } - options.decryption.MessageCrypto = messageCrypto - } decryptor = cryptointernal.NewConsumerDecryptor( options.decryption.KeyReader, options.decryption.MessageCrypto, diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 55823e4..cadd8e4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -332,14 +332,77 @@ func TestPartiti
[pulsar-client-go] branch master updated (ff7a962 -> 2bcf7c7)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from ff7a962 Revert "Use a separate gorutine to handle the logic of reconnect" (#700) add 2bcf7c7 Fix data race while accessing connection in partitionProducer (#701) No new revisions were added by this update. Summary of changes: pulsar/producer_partition.go | 46 +--- 1 file changed, 31 insertions(+), 15 deletions(-)
[pulsar-client-go] branch master updated: Revert "Use a separate gorutine to handle the logic of reconnect" (#700)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ff7a962 Revert "Use a separate gorutine to handle the logic of reconnect" (#700) ff7a962 is described below commit ff7a962be6b41da2c318e6fe70da00545bc0bdd4 Author: xiaolong ran AuthorDate: Thu Jan 6 18:43:05 2022 +0800 Revert "Use a separate gorutine to handle the logic of reconnect" (#700) * Revert "Use a separate gorutine to handle the logic of reconnect (#691)" This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37. * add closeCh for go rutine leak Signed-off-by: xiaolongran --- pulsar/producer_partition.go | 20 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b517309..3d62758 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -77,8 +77,9 @@ type partitionProducer struct { batchFlushTicker *time.Ticker // Channel where app is posting messages to be published - connectClosedCh chan connectionClosed eventsChan chan interface{} + closeCh chan struct{} + connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue @@ -115,8 +116,9 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), connectClosedCh: make(chan connectionClosed, 10), - eventsChan: make(chan interface{}, maxPendingMessages+20), + closeCh: make(chan struct{}), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), @@ -369,13 +371,13 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - go func() { - for { - for range p.connectClosedCh { - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } + select { + case <-p.closeCh: + return + case <-p.connectClosedCh: + p.log.Info("runEventsLoop will reconnect in producer") + p.reconnectToBroker() } }() @@ -872,6 +874,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.setProducerState(producerClosed) p.cnx.UnregisterListener(p.producerID) p.batchFlushTicker.Stop() + + close(p.closeCh) } func (p *partitionProducer) LastSequenceID() int64 {
[pulsar-client-go] branch revert-691-xiaolong/split-eventsCh updated (accf726 -> 536d882)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch revert-691-xiaolong/split-eventsCh in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from accf726 Revert "Use a separate gorutine to handle the logic of reconnect (#691)" add 536d882 add closeCh for go rutine leak No new revisions were added by this update. Summary of changes: pulsar/producer_partition.go | 16 ++-- 1 file changed, 14 insertions(+), 2 deletions(-)
[pulsar-client-go] branch revert-691-xiaolong/split-eventsCh created (now accf726)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch revert-691-xiaolong/split-eventsCh in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. at accf726 Revert "Use a separate gorutine to handle the logic of reconnect (#691)" This branch includes the following new commits: new accf726 Revert "Use a separate gorutine to handle the logic of reconnect (#691)" The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[pulsar-client-go] 01/01: Revert "Use a separate gorutine to handle the logic of reconnect (#691)"
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch revert-691-xiaolong/split-eventsCh in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit accf726bdf3972ac3b8b64c7fe1c0480959cc2de Author: xiaolong ran AuthorDate: Thu Jan 6 10:37:43 2022 +0800 Revert "Use a separate gorutine to handle the logic of reconnect (#691)" This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37. --- pulsar/producer_partition.go | 16 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b517309..d67c0c0 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -77,8 +77,8 @@ type partitionProducer struct { batchFlushTicker *time.Ticker // Channel where app is posting messages to be published - connectClosedCh chan connectionClosed eventsChan chan interface{} + connectClosedCh chan connectionClosed publishSemaphore internal.Semaphore pendingQueue internal.BlockingQueue @@ -115,8 +115,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions log: logger, options: options, producerID: client.rpcClient.NewProducerID(), + eventsChan: make(chan interface{}, maxPendingMessages), connectClosedCh: make(chan connectionClosed, 10), - eventsChan: make(chan interface{}, maxPendingMessages+20), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)), pendingQueue: internal.NewBlockingQueue(maxPendingMessages), @@ -369,16 +369,6 @@ func (p *partitionProducer) reconnectToBroker() { } func (p *partitionProducer) runEventsLoop() { - - go func() { - for { - for range p.connectClosedCh { - p.log.Info("runEventsLoop will reconnect in producer") - p.reconnectToBroker() - } - } - }() - for { select { case i := <-p.eventsChan: @@ -391,6 +381,8 @@ func (p *partitionProducer) runEventsLoop() { p.internalClose(v) return } + case <-p.connectClosedCh: + p.reconnectToBroker() case <-p.batchFlushTicker.C: if p.batchBuilder.IsMultiBatches() { p.internalFlushCurrentBatches()
[pulsar-client-go] branch master updated (d5d4903 -> 39e13ac)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from d5d4903 Provide lock-free access to partition consumer compression providers (#689) add 39e13ac Use a separate gorutine to handle the logic of reconnect (#691) No new revisions were added by this update. Summary of changes: pulsar/producer_partition.go | 16 1 file changed, 12 insertions(+), 4 deletions(-)
[pulsar-client-go] branch master updated: Provide lock-free access to partition consumer compression providers (#689)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new d5d4903 Provide lock-free access to partition consumer compression providers (#689) d5d4903 is described below commit d5d49031c7da2aa7fa0b9338e63addc6a762281b Author: dferstay AuthorDate: Sat Dec 25 19:11:32 2021 -0800 Provide lock-free access to partition consumer compression providers (#689) The compression providers map in the partition consumer is a textbook case for using go's lock-free sync.Map: the set of map entries is stable and access is read-only. On machines with 4 cores or greater, read contention on the sync.RWMutex outweighs the cost of using a sync.Map. Below is an old article on the subject, but it still holds true today: https://medium.com/@deckarep/the-new-kid-in-town-gos-sync-map-de24a6bf7c2c Signed-off-by: Daniel Ferstay Co-authored-by: Daniel Ferstay --- pulsar/consumer_partition.go | 42 --- pulsar/consumer_partition_test.go | 9 - 2 files changed, 30 insertions(+), 21 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index ecde0bf..d438b87 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -144,8 +144,7 @@ type partitionConsumer struct { log log.Logger - providersMutex sync.RWMutex - compressionProviders map[pb.CompressionType]compression.Provider + compressionProviders sync.Map //map[pb.CompressionType]compression.Provider metrics *internal.LeveledMetrics decryptorcryptointernal.Decryptor } @@ -171,7 +170,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon closeCh: make(chan struct{}), clearQueueCh: make(chan func(id trackingMessageID)), clearMessageQueuesCh: make(chan chan struct{}), - compressionProviders: make(map[pb.CompressionType]compression.Provider), + compressionProviders: sync.Map{}, dlq: dlq, metrics: metrics, } @@ -967,11 +966,15 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { pc.log.Info("Closed consumer") } - pc.providersMutex.Lock() - for _, provider := range pc.compressionProviders { - provider.Close() - } - pc.providersMutex.Unlock() + pc.compressionProviders.Range(func(_, v interface{}) bool { + if provider, ok := v.(compression.Provider); ok { + provider.Close() + } else { + err := fmt.Errorf("unexpected compression provider type: %T", v) + pc.log.WithError(err).Warn("Failed to close compression provider") + } + return true + }) pc.setConsumerState(consumerClosed) pc._getConn().DeleteConsumeHandler(pc.consumerID) @@ -1192,19 +1195,26 @@ func getPreviousMessage(mid trackingMessageID) trackingMessageID { } func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) { - pc.providersMutex.RLock() - provider, ok := pc.compressionProviders[msgMeta.GetCompression()] - pc.providersMutex.RUnlock() + providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression()) if !ok { - var err error - if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil { + newProvider, err := pc.initializeCompressionProvider(msgMeta.GetCompression()) + if err != nil { pc.log.WithError(err).Error("Failed to decompress message.") return nil, err } - pc.providersMutex.Lock() - pc.compressionProviders[msgMeta.GetCompression()] = provider - pc.providersMutex.Unlock() + var loaded bool + providerEntry, loaded = pc.compressionProviders.LoadOrStore(msgMeta.GetCompression(), newProvider) + if loaded { + // another thread already loaded this provider, so close the one we just initialized + newProvider.Close() + } + } + provider, ok := providerEntry.(compression.Provider) + if !ok { + err := fmt.Errorf("unexpected compression provider type: %T", providerEntry) + pc.log.WithError(err).Error("Failed to decompress message.") +
[pulsar-client-go] branch master updated: set default go version to 1.13 in CI related files (#692)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 310d480 set default go version to 1.13 in CI related files (#692) 310d480 is described below commit 310d480485cd3e945ad991aade915f3d6a353115 Author: Eugene R AuthorDate: Sun Dec 26 05:10:36 2021 +0200 set default go version to 1.13 in CI related files (#692) Set the default go version to 1.13 in the CI related files as per the minimum version requirements. --- Dockerfile | 2 +- docker-ci.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6a2fb29..401a246 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ # under the License. # -ARG GO_VERSION=golang:1.12 +ARG GO_VERSION=golang:1.13 FROM apachepulsar/pulsar:latest as pulsar FROM $GO_VERSION as go diff --git a/docker-ci.sh b/docker-ci.sh index 74a57c1..200cc50 100755 --- a/docker-ci.sh +++ b/docker-ci.sh @@ -25,7 +25,7 @@ cd ${SRC_DIR} IMAGE_NAME=pulsar-client-go-test:latest -GO_VERSION=${1:-1.12} +GO_VERSION=${1:-1.13} docker rmi --force ${IMAGE_NAME} || true docker rmi --force apachepulsar/pulsar:latest || true docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="golang:${GO_VERSION}" .
[pulsar-client-go] branch master updated: fix:add order key to message (#688)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new e03600f fix:add order key to message (#688) e03600f is described below commit e03600f28701c248d4841aa02715e4be1064a76c Author: Lei Zhiyuan AuthorDate: Fri Dec 24 17:34:57 2021 +0800 fix:add order key to message (#688) Co-authored-by: zhiyuanlei fix:add order key to message --- pulsar/consumer_partition.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 0b4e9fe..ecde0bf 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -551,6 +551,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), encryptionContext: createEncryptionContext(msgMeta), + orderingKey: string(msgMeta.OrderingKey), }, } pc.queueCh <- messages @@ -622,6 +623,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header replicationClusters: msgMeta.GetReplicateTo(), replicatedFrom: msgMeta.GetReplicatedFrom(), redeliveryCount: response.GetRedeliveryCount(), + orderingKey: string(smm.OrderingKey), } } else { msg = &message{
[pulsar-client-go] branch master updated (2bbfb8e -> 067f805)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 2bbfb8e [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678) add 067f805 Upgrade DataDog/zstd to v1.5.0 (#690) No new revisions were added by this update. Summary of changes: go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-)
[pulsar-client-go] branch master updated: [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 2bbfb8e [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678) 2bbfb8e is described below commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7 Author: Ben Schofield AuthorDate: Wed Dec 22 06:22:18 2021 + [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678) * Correct apparent logic error in batchContainer's hasSpace() method. * Make the same change to keyBasedBatchContainer's hasSpace() method. * Fix comment length to pass style checks. * Allow TestMaxMessageSize() to run * Add TestMaxBatchSize() to validate that this limit is respected. Based on the results of the test, change the < in hasSpace() to be a <=. * Further correct logic in hasSpace() / IsFull() * Remember to make the change in both places! Co-authored-by: ben --- pulsar/internal/batch_builder.go | 7 ++--- pulsar/internal/key_based_batch_builder.go | 7 ++--- pulsar/producer_test.go| 42 -- 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 7e47304..9d18f26 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -150,14 +150,15 @@ func NewBatchBuilder( return &bc, nil } -// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch +// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *batchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) } +// hasSpace should return true if and only if the batch container can accommodate another message of length payload. func (bc *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) + return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) } // Add will add single message to batch. diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 24d564b..d09138c 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder( return bb, nil } -// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch +// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *keyBasedBatchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) } func (bc *keyBasedBatchContainer) IsMultiBatches() bool { return true } +// hasSpace should return true if and only if the batch container can accommodate another message of length payload. func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) + return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) } // Add will add single message to key-based batch with message key. diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index f914017..124c828 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) { canc() } -func TestMaxMessageSize(t *testing.T) { +func TestMaxBatchSize(t *testing.T) { + // Set to be < serverMaxMessageSize + batchMaxMessageSize := 512 * 1024 + client, err := NewClient(ClientOptions{ URL: serviceURL, }) assert.NoError(t, err) defer client.Close() + producer, err := client.CreateProducer(ProducerOptions{ - Topic: newTopicName(), + Topic: newTopicName(), + BatchingMaxSize: uint(batchMaxMessageSize), }) assert.NoError(t, err) assert.NotNil(t, producer) defer producer.Close() + + for
[pulsar] branch master updated (c94c81c -> f69a11e)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from c94c81c fix wrong result for looking up a non-exist topic by rest api (#13055) add f69a11e [Docs] Add docs of prometheus metrics for pulsar go client (#13271) No new revisions were added by this update. Summary of changes: site2/docs/client-libraries-go.md | 147 ++ 1 file changed, 147 insertions(+)
[pulsar-client-go] branch master updated: Add properties filed for batch (#683)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new d0d5d0a Add properties filed for batch (#683) d0d5d0a is described below commit d0d5d0ae403717505f85b86c100fac6ff4d7dcf6 Author: xiaolong ran AuthorDate: Fri Dec 10 15:12:53 2021 +0800 Add properties filed for batch (#683) Signed-off-by: xiaolongran ### Motivation Currently, when we disable batch in Producer, in `handleSend()` of `serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and `msgMetadata.getNumMessagesInBatch()` is **1**. At this point, if we get the Properties object we set on the producer side on the broker side, the display is empty. Go SDK set Properties: ``` // disable batch producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1", DisableBatching: true, }) // set properties for every message producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello-%d", i)), Properties: map[string]string{ "key-1": "value-1", }, }); ``` Broker get message properties from entry metadata is null: ``` ByteBuf metadataAndPayload = entry.getDataBuffer(); MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1); ``` And `msgMetadata.getPropertiesCount() <= 0`. ### Modifications Add properties filed in Add single message to batchContainer --- pulsar/internal/batch_builder.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index d08af53..7e47304 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -191,6 +191,7 @@ func (bc *batchContainer) Add( bc.msgMetadata.ProducerName = &bc.producerName bc.msgMetadata.ReplicateTo = replicateTo bc.msgMetadata.PartitionKey = metadata.PartitionKey + bc.msgMetadata.Properties = metadata.Properties if deliverAt.UnixNano() > 0 { bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) @@ -211,6 +212,7 @@ func (bc *batchContainer) reset() { bc.callbacks = []interface{}{} bc.msgMetadata.ReplicateTo = nil bc.msgMetadata.DeliverAtTime = nil + bc.msgMetadata.Properties = nil } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
[pulsar] branch master updated (a89c6e4 -> 1220f84)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from a89c6e4 fix(docs): incorrect command name and description for autorecovery (#12989) add 1220f84 fix:remove the loadbalance/bundle-data node (#13164) No new revisions were added by this update. Summary of changes: .../broker/resources/NamespaceResources.java | 36 ++ .../pulsar/broker/admin/impl/NamespacesBase.java | 2 ++ .../pulsar/broker/admin/impl/TenantsBase.java | 2 ++ .../apache/pulsar/broker/admin/AdminApiTest2.java | 6 4 files changed, 46 insertions(+)
[pulsar] branch master updated (bce8a2e -> 86fe7d2)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from bce8a2e [Workflow] add guidelines for merging a PR (#12988) add 86fe7d2 Fix zk-node leak of admin path (#12972) No new revisions were added by this update. Summary of changes: .../pulsar/broker/resources/LocalPoliciesResources.java | 16 .../pulsar/broker/resources/NamespaceResources.java | 15 +++ .../org/apache/pulsar/broker/admin/impl/TenantsBase.java | 4 .../org/apache/pulsar/broker/admin/AdminApiTest2.java| 4 4 files changed, 39 insertions(+)
[pulsar-client-go] branch master updated: Add subscription properties for ConsumerOptions (#671)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 6385727 Add subscription properties for ConsumerOptions (#671) 6385727 is described below commit 6385727f1a5e40e144f3239794135b5399a5a49e Author: xiaolong ran AuthorDate: Mon Nov 29 12:39:49 2021 +0800 Add subscription properties for ConsumerOptions (#671) Signed-off-by: xiaolongran ### Motivation In https://github.com/apache/pulsar/pull/12869, we introduce pluggable entry filter in Dispatcher, the pull request is the Go SDK implementation of this PIP ### Modifications *Describe the modifications you've done.* - Add subscription properties for ConsumerOptions - Update `pulsarApi.proto` file --- pulsar/consumer.go |6 + pulsar/consumer_impl.go |2 + pulsar/consumer_partition.go |6 + pulsar/internal/pulsar_proto/PulsarApi.pb.go | 1609 +++--- pulsar/internal/pulsar_proto/PulsarApi.proto | 112 +- 5 files changed, 1274 insertions(+), 461 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index cdb6887..a71a2d4 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -101,6 +101,12 @@ type ConsumerOptions struct { // Those properties will be visible in the topic stats Properties map[string]string + // SubscriptionProperties specify the subscription properties for this subscription. + // + // > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a + // > subscription if they use different properties. + SubscriptionProperties map[string]string + // Type specifies the subscription type to be used when subscribing to a topic. // Default is `Exclusive` Type SubscriptionType diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 1bf75b5..00e0b76 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -297,6 +297,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { receiverQueueSize := c.options.ReceiverQueueSize metadata := c.options.Properties + subProperties := c.options.SubscriptionProperties startPartition := oldNumPartitions partitionsToAdd := newNumPartitions - oldNumPartitions @@ -333,6 +334,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { nackRedeliveryDelay:nackRedeliveryDelay, nackBackoffPolicy: c.options.NackBackoffPolicy, metadata: metadata, + subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: trackingMessageID{}, subscriptionMode: durable, diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index c8e5d9f..0b4e9fe 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -91,6 +91,7 @@ type partitionConsumerOpts struct { nackRedeliveryDelaytime.Duration nackBackoffPolicy NackBackoffPolicy metadata map[string]string + subProperties map[string]string replicateSubscriptionState bool startMessageID trackingMessageID startMessageIDInclusivebool @@ -1058,6 +1059,7 @@ func (pc *partitionConsumer) grabConn() error { PriorityLevel: nil, Durable: proto.Bool(pc.options.subscriptionMode == durable), Metadata: internal.ConvertFromStringMap(pc.options.metadata), + SubscriptionProperties: internal.ConvertFromStringMap(pc.options.subProperties), ReadCompacted: proto.Bool(pc.options.readCompacted), Schema: pbSchema, InitialPosition:initialPosition.Enum(), @@ -1075,6 +1077,10 @@ func (pc *partitionConsumer) grabConn() error { cmdSubscribe.Metadata = toKeyValues(pc.options.metadata) } + if len(pc.options.subProperties) > 0 { + cmdSubscribe.SubscriptionProperties = toKeyValues(pc.options.subProperties) + } + // force topic creation is enabled by default so // we only need to set the flag when disabling it if pc.options.disableForceTopicCreation { diff --git a/pulsar/internal/pulsar_proto/PulsarApi.pb.go b/pulsar/internal/pulsar_proto/PulsarA
[pulsar] branch master updated (5e25184 -> bef937b)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 5e25184 [docs] [ISSUE 11805] Add topic lookup metrics (#11927) add bef937b Add subscription properties for SubscriptionStats (#12979) No new revisions were added by this update. Summary of changes: .../service/persistent/PersistentSubscription.java | 1 + .../broker/admin/CreateSubscriptionTest.java | 39 ++ .../common/policies/data/SubscriptionStats.java| 3 ++ .../policies/data/stats/SubscriptionStatsImpl.java | 7 4 files changed, 50 insertions(+)
[pulsar] branch master updated (60f5475 -> f68bec4)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 60f5475 Improve exception info for invaild time-related option (#12828) add f68bec4 Fix znode leakage caused by deleting tenant (#12711) No new revisions were added by this update. Summary of changes: .../pulsar/broker/resources/BaseResources.java | 72 +++ .../broker/resources/LocalPoliciesResources.java | 4 + .../broker/resources/NamespaceResources.java | 70 +-- .../pulsar/broker/admin/impl/NamespacesBase.java | 100 +++-- .../pulsar/broker/admin/impl/TenantsBase.java | 69 ++ .../apache/pulsar/broker/admin/AdminApiTest2.java | 90 +++ 6 files changed, 273 insertions(+), 132 deletions(-)
[pulsar] branch master updated (f9d16ca -> f44f6a0)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from f9d16ca [Doc] Add explanations for setting geo-replication at topic level (#12633) add f44f6a0 Add docs for nack backoff policy (#12660) No new revisions were added by this update. Summary of changes: site2/docs/client-libraries-java.md | 20 site2/docs/concepts-messaging.md| 15 +++ 2 files changed, 35 insertions(+)
[pulsar-client-go] branch master updated: remove unused dependency (#661)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new fe3b7c4 remove unused dependency (#661) fe3b7c4 is described below commit fe3b7c4e445b3de42974ca692574229ad9099a45 Author: Eugene R AuthorDate: Mon Nov 8 06:42:48 2021 +0200 remove unused dependency (#661) Remove unused dependency in `oauth2` module. ### Verifying this change - [x] Make sure that the change passes the CI checks. This change is a trivial rework / code cleanup without any test coverage. --- oauth2/go.mod | 1 - oauth2/go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/oauth2/go.mod b/oauth2/go.mod index 89d5015..091477d 100644 --- a/oauth2/go.mod +++ b/oauth2/go.mod @@ -4,7 +4,6 @@ go 1.13 require ( github.com/99designs/keyring v1.1.6 - github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/form3tech-oss/jwt-go v3.2.3+incompatible github.com/onsi/ginkgo v1.14.0 github.com/onsi/gomega v1.10.1 diff --git a/oauth2/go.sum b/oauth2/go.sum index 48ffe44..dad3d35 100644 --- a/oauth2/go.sum +++ b/oauth2/go.sum @@ -6,8 +6,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= -github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ= github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
[pulsar-client-go] branch master updated: Support nack backoff policy for SDK (#660)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 567263f Support nack backoff policy for SDK (#660) 567263f is described below commit 567263ff5b077e9a737f96c9523646225502f141 Author: xiaolong ran AuthorDate: Mon Nov 8 12:42:03 2021 +0800 Support nack backoff policy for SDK (#660) * Support nack backoff policy for SDK Signed-off-by: xiaolongran * add test case and fix some logic Signed-off-by: xiaolongran * fix action ciu Signed-off-by: xiaolongran * fix action ci Signed-off-by: xiaolongran * fix some logic Signed-off-by: xiaolongran * fix data race Signed-off-by: xiaolongran * fix data race Signed-off-by: xiaolongran * fix a little Signed-off-by: xiaolongran * fix data race Signed-off-by: xiaolongran * fix data race Signed-off-by: xiaolongran * fix some comments Signed-off-by: xiaolongran * fix test case Signed-off-by: xiaolongran --- pulsar/consumer.go | 11 +++ pulsar/consumer_impl.go| 20 + pulsar/consumer_multitopic.go | 16 pulsar/consumer_partition.go | 8 +- pulsar/consumer_regex.go | 16 pulsar/impl_message.go | 7 ++ pulsar/negative_acks_tracker.go| 44 +- pulsar/negative_acks_tracker_test.go | 156 - pulsar/negative_backoff_policy.go | 46 ++ pulsar/negative_backoff_policy_test.go | 34 +++ 10 files changed, 351 insertions(+), 7 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index c9fbc0d..5127955 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -158,6 +158,17 @@ type ConsumerOptions struct { // Decryption decryption related fields to decrypt the encrypted message Decryption *MessageDecryptionInfo + + // If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of + // nack backoff, Default: false. + EnableDefaultNackBackoffPolicy bool + + // NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different + // delays according to the number of times the message is retried. + // + // > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)` + // > because we are not able to get the redeliveryCount from the message ID. + NackBackoffPolicy NackBackoffPolicy } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6aef497..1bf75b5 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -35,6 +35,7 @@ const defaultNackRedeliveryDelay = 1 * time.Minute type acker interface { AckID(id trackingMessageID) NackID(id trackingMessageID) + NackMsg(msg Message) } type consumer struct { @@ -87,6 +88,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } + if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy { + options.NackBackoffPolicy = new(defaultNackBackoffPolicy) + } + // did the user pass in a message channel? messageCh := options.MessageChannel if options.MessageChannel == nil { @@ -326,6 +331,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { partitionIdx: idx, receiverQueueSize: receiverQueueSize, nackRedeliveryDelay:nackRedeliveryDelay, + nackBackoffPolicy: c.options.NackBackoffPolicy, metadata: metadata, replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: trackingMessageID{}, @@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) { } func (c *consumer) Nack(msg Message) { + if c.options.EnableDefaultNackBackoffPolicy || c.options.NackBackoffPolicy != nil { + mid, ok := c.messageID(msg.ID()) + if !ok { + return + } + + if mid.consumer != nil { + mid.Nack() + return + } + c.consumers[mid.partitionIdx].NackMsg(msg) + return + } + c.NackID(msg.ID()
[pulsar] branch master updated: Add negative ack redelivery backoff. (#12566)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d79cd04 Add negative ack redelivery backoff. (#12566) d79cd04 is described below commit d79cd0479eabebef2ce72eca1330af103115f67f Author: hanmz AuthorDate: Thu Nov 4 14:15:17 2021 +0800 Add negative ack redelivery backoff. (#12566) ### Motivation Add negative ack redelivery backoff. ### Modifications - add new `NegativeAckBackoff` interface - expose `egativeAckRedeliveryBackoff` in ConsumerBulider - add unit test case --- .../pulsar/client/impl/NegativeAcksTest.java | 102 + .../pulsar/client/api/ConsumerConfiguration.java | 17 .../apache/pulsar/client/api/ConsumerBuilder.java | 14 +++ .../client/api/NegativeAckRedeliveryBackoff.java | 40 .../pulsar/client/impl/ConsumerBuilderImpl.java| 8 ++ .../apache/pulsar/client/impl/ConsumerImpl.java| 8 ++ .../client/impl/MultiTopicsConsumerImpl.java | 10 ++ .../NegativeAckRedeliveryExponentialBackoff.java | 94 +++ .../pulsar/client/impl/NegativeAcksTracker.java| 46 +- .../impl/conf/ConsumerConfigurationData.java | 4 + .../api/NegativeAckRedeliveryBackoffTest.java | 55 +++ .../client/impl/ConsumerBuilderImplTest.java | 8 ++ 12 files changed, 405 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 5eb43af..638a969 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; @@ -154,4 +155,105 @@ public class NegativeAcksTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + +@DataProvider(name = "variationsBackoff") +public static Object[][] variationsBackoff() { +return new Object[][] { +// batching / partitions / subscription-type / min-nack-time-ms/ max-nack-time-ms / ack-timeout +{ false, false, SubscriptionType.Shared, 100, 1000, 0 }, +{ false, false, SubscriptionType.Failover, 100, 1000, 0 }, +{ false, true, SubscriptionType.Shared, 100, 1000, 0 }, +{ false, true, SubscriptionType.Failover, 100, 1000, 0 }, +{ true, false, SubscriptionType.Shared, 100, 1000, 0 }, +{ true, false, SubscriptionType.Failover, 100, 1000, 0 }, +{ true, true, SubscriptionType.Shared, 100, 1000, 0 }, +{ true, true, SubscriptionType.Failover, 100, 1000, 0 }, + +{ false, false, SubscriptionType.Shared, 0, 1000, 0 }, +{ false, false, SubscriptionType.Failover, 0, 1000, 0 }, +{ false, true, SubscriptionType.Shared, 0, 1000, 0 }, +{ false, true, SubscriptionType.Failover, 0, 1000, 0 }, +{ true, false, SubscriptionType.Shared, 0, 1000, 0 }, +{ true, false, SubscriptionType.Failover, 0, 1000, 0 }, +{ true, true, SubscriptionType.Shared, 0, 1000, 0 }, +{ true, true, SubscriptionType.Failover, 0, 1000, 0 }, + +{ false, false, SubscriptionType.Shared, 100, 1000, 1000 }, +{ false, false, SubscriptionType.Failover, 100, 1000, 1000 }, +{ false, true, SubscriptionType.Shared, 100, 1000, 1000 }, +{ false, true, SubscriptionType.Failover, 100, 1000, 1000 }, +{ true, false, SubscriptionType.Shared, 100, 1000, 1000 }, +{ true, false, SubscriptionType.Failover, 100, 1000, 1000 }, +{ true, true, SubscriptionType.Shared, 100, 1000, 1000 }, +{ true, true, SubscriptionType.Failover, 100, 1000, 1000 }, + +{ false, false, SubscriptionType.Shared, 0, 1000, 1000 }, +{ false, false, SubscriptionType.Failover, 0, 1000, 1000 }, +{ false, true, SubscriptionType.Shared, 0, 1000, 1000 }, +{ false, true, SubscriptionType.Failover, 0, 1000, 1000 }, +{ true, false, SubscriptionType.Shared, 0, 1000, 1000 }, +{ true, false, SubscriptionType.Failove
[pulsar-client-go] branch master updated: fix issue 650, different handle sequence value (#651)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new d80a722 fix issue 650,different handle sequence value (#651) d80a722 is described below commit d80a722ac1ab197c7e8649efdeb1e16356cbb3bb Author: baomingyu AuthorDate: Thu Nov 4 10:57:21 2021 +0800 fix issue 650,different handle sequence value (#651) * fix issue 650,different handle sequence value * add sequenceId equal check --- pulsar/producer_partition.go | 76 ++-- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b2b9273..d67c0c0 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -784,7 +784,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) return } - if pi.sequenceID != response.GetSequenceId() { + if pi.sequenceID < response.GetSequenceId() { // if we receive a receipt that is not the one expected, the state of the broker and the producer differs. // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves // the state discrepancy. @@ -792,49 +792,49 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) response.GetSequenceId(), pi.sequenceID) p.cnx.Close() return - } - - // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback - p.pendingQueue.Poll() - - now := time.Now().UnixNano() - - // lock the pending item while sending the requests - pi.Lock() - defer pi.Unlock() - p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) - for idx, i := range pi.sendRequests { - sr := i.(*sendRequest) - if sr.msg != nil { - atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) - p.publishSemaphore.Release() + } else if pi.sequenceID == response.GetSequenceId() { + // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback + p.pendingQueue.Poll() + + now := time.Now().UnixNano() + + // lock the pending item while sending the requests + pi.Lock() + defer pi.Unlock() + p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9) + for idx, i := range pi.sendRequests { + sr := i.(*sendRequest) + if sr.msg != nil { + atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) + p.publishSemaphore.Release() + + p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) + p.metrics.MessagesPublished.Inc() + p.metrics.MessagesPending.Dec() + payloadSize := float64(len(sr.msg.Payload)) + p.metrics.BytesPublished.Add(payloadSize) + p.metrics.BytesPending.Sub(payloadSize) + } - p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) - p.metrics.MessagesPublished.Inc() - p.metrics.MessagesPending.Dec() - payloadSize := float64(len(sr.msg.Payload)) - p.metrics.BytesPublished.Add(payloadSize) - p.metrics.BytesPending.Sub(payloadSize) - } + if sr.callback != nil || len(p.options.Interceptors) > 0 { + msgID := newMessageID( + int64(response.MessageId.GetLedgerId()), + int64(response.MessageId.GetEntryId()), + int32(idx), + p.partitionIdx, + ) - if sr.callback != nil || len(p.options.Interceptors) > 0 { - msgID := newMessageID( - int64(response.MessageId.GetLedgerId()), - int64(response.MessageId.GetEntryId()), - int32(idx), - p.partitionIdx, - ) + if sr.callback != nil { + sr.callback(m
[pulsar-client-go] branch master updated (6ffefa5 -> 972dc97)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 6ffefa5 Update change log for 0.7.0 release (#655) add 972dc97 Update version to 0.7.0 (#654) No new revisions were added by this update. Summary of changes: VERSION| 2 +- stable.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-)
[pulsar-client-go] branch master updated: Update change log for 0.7.0 release (#655)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 6ffefa5 Update change log for 0.7.0 release (#655) 6ffefa5 is described below commit 6ffefa5fe7ce12f3df7680282eae33ddf6d777a5 Author: cckellogg AuthorDate: Sun Oct 31 19:35:58 2021 -0700 Update change log for 0.7.0 release (#655) Update change log for 0.7.0 release --- CHANGELOG.md | 24 1 file changed, 24 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce91e22..2e43ddc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,30 @@ All notable changes to this project will be documented in this file. +[0.7.0] 2021-10-31 + +## Feature +* Encryption support for producer, see [PR-560](https://github.com/apache/pulsar-client-go/pull/560) +* Decrytion support for consumer, see [PR-612](https://github.com/apache/pulsar-client-go/pull/612) +* User-defined metric cardinality, see [PR-604](https://github.com/apache/pulsar-client-go/pull/604) +* Better support for Azure AD OAuth 2.0, see [PR-630](https://github.com/apache/pulsar-client-go/pull/630), [PR-633](https://github.com/apache/pulsar-client-go/pull/633), [PR-634](https://github.com/apache/pulsar-client-go/pull/634) +* Removed testing for go versions 1.11 and 1.12, see [PR-632](https://github.com/apache/pulsar-client-go/pull/632) +* Add epoch to create producer to prevent a duplicate producer when broker is not available., see [PR-582] (https://github.com/apache/pulsar-client-go/pull/582) + +## Improve +* Fix batch size limit validation, see [PR-528](https://github.com/apache/pulsar-client-go/pull/528) +* Fix logic of command for sendError, see [PR-622](https://github.com/apache/pulsar-client-go/pull/622) +* Drain connection requests channel without closing, see [PR-645](https://github.com/apache/pulsar-client-go/pull/645) +* Fix ConsumersOpened counter not incremented when use multitopic or regexp consumer, see [PR-619](https://github.com/apache/pulsar-client-go/pull/619) +* Fix reconnection logic when topic is deleted, see [PR-627](https://github.com/apache/pulsar-client-go/pull/627) +* Fix panic when scale down partitions, see [PR-601](https://github.com/apache/pulsar-client-go/pull/601) +* Fix missing metrics for topics by registration of existing collector, see [PR-600](https://github.com/apache/pulsar-client-go/pull/600) +* Fix producer panic by oldProducers, see [PR-598](https://github.com/apache/pulsar-client-go/pull/598) +* Fail pending messages when topic is terminated, see [PR-588](https://github.com/apache/pulsar-client-go/pull/588) +* Fix handle send error panic, see [PR-576](https://github.com/apache/pulsar-client-go/pull/576) + + + [0.6.0] 2021-07-21 ## Feature
[pulsar-client-go] branch master updated (171ef57 -> 1bd2993)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from 171ef57 Go version required should be 1.13, example produce log print error (#649) add 1bd2993 Update release docs with missing information (#656) No new revisions were added by this update. Summary of changes: docs/release-process.md | 23 +++ 1 file changed, 19 insertions(+), 4 deletions(-)
[pulsar-client-go] branch master updated: [issue 513] fix batch size limit validation (#528)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c3922b8 [issue 513] fix batch size limit validation (#528) c3922b8 is described below commit c3922b82cd0e58b69670a28a96abdf88f61e0f91 Author: Ming AuthorDate: Mon Oct 25 07:56:54 2021 -0400 [issue 513] fix batch size limit validation (#528) * fix batch size limit validation * respect either of one batch size and number of messages limit * correct hasSpace logic * fix key-based batch builder --- pulsar/internal/batch_builder.go | 4 ++-- pulsar/internal/key_based_batch_builder.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 92d6249..d08af53 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -157,7 +157,7 @@ func (bc *batchContainer) IsFull() bool { func (bc *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize) + return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) } // Add will add single message to batch. @@ -174,7 +174,7 @@ func (bc *batchContainer) Add( // There's already a message with cluster replication list. need to flush before next // message can be sent return false - } else if bc.hasSpace(payload) { + } else if !bc.hasSpace(payload) { // The current batch is full. Producer has to call Flush() to return false } diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 940aa9f..24d564b 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -117,7 +117,7 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool { func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize) + return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize) } // Add will add single message to key-based batch with message key. @@ -134,7 +134,7 @@ func (bc *keyBasedBatchContainer) Add( // There's already a message with cluster replication list. need to flush before next // message can be sent return false - } else if bc.hasSpace(payload) { + } else if !bc.hasSpace(payload) { // The current batch is full. Producer has to call Flush() to return false }
[pulsar-client-go] branch master updated: Fix minor api issue and comments. (#637)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 4e55be0 Fix minor api issue and comments. (#637) 4e55be0 is described below commit 4e55be0a363ef57e90a60a29803b578d0498c1d1 Author: cckellogg AuthorDate: Wed Oct 13 02:39:15 2021 -0400 Fix minor api issue and comments. (#637) Fix some comment spelling mistakes and spelling on one api. --- pulsar/crypto/crypto_failure_action.go | 2 +- pulsar/crypto/default_message_crypto.go | 12 ++-- pulsar/crypto/encryption_key_Info.go| 8 pulsar/crypto/message_crypto.go | 12 pulsar/crypto/message_metadata.go | 12 ++-- pulsar/crypto/message_metadata_test.go | 6 +++--- 6 files changed, 28 insertions(+), 24 deletions(-) diff --git a/pulsar/crypto/crypto_failure_action.go b/pulsar/crypto/crypto_failure_action.go index 891d740..0bdd289 100644 --- a/pulsar/crypto/crypto_failure_action.go +++ b/pulsar/crypto/crypto_failure_action.go @@ -21,7 +21,7 @@ const ( // ProducerCryptoFailureActionFail this is the default option to fail send if crypto operation fails. ProducerCryptoFailureActionFail = iota - // ProducerCryptoFailureActionSend ingnore crypto failure and proceed with sending unencrypted message. + // ProducerCryptoFailureActionSend ignore crypto failure and proceed with sending unencrypted message. ProducerCryptoFailureActionSend ) diff --git a/pulsar/crypto/default_message_crypto.go b/pulsar/crypto/default_message_crypto.go index 93a0244..74e4592 100644 --- a/pulsar/crypto/default_message_crypto.go +++ b/pulsar/crypto/default_message_crypto.go @@ -34,7 +34,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/log" ) -// DefaultMessageCrypto implmentation of the interface MessageCryto +// DefaultMessageCrypto implementation of the interface MessageCryto type DefaultMessageCrypto struct { // data key which is used to encrypt/decrypt messages dataKey []byte @@ -134,7 +134,7 @@ func (d *DefaultMessageCrypto) RemoveKeyCipher(keyName string) bool { return true } -// Encrypt encrypt payload using encryption keys and add encrypted data key +// Encrypt payload using encryption keys and add encrypted data key // to message metadata. Here data key is encrypted // using public key func (d *DefaultMessageCrypto) Encrypt(encKeys []string, @@ -161,7 +161,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string, keyInfo, keyInfoOk := k.(*EncryptionKeyInfo) if keyInfoOk { - msgMetadata.UpsertEncryptionkey(*keyInfo) + msgMetadata.UpsertEncryptionKey(*keyInfo) } else { d.logger.Error("failed to get EncryptionKeyInfo for key %v", keyName) } @@ -206,8 +206,8 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string, return gcm.Seal(nil, nonce, payload, nil), nil } -// Decrypt decrypt the payload using decrypted data key. -// Here data key is read from from the message +// Decrypt the payload using decrypted data key. +// Here data key is read from the message // metadata and decrypted using private key. func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier, payload []byte, @@ -285,7 +285,7 @@ func (d *DefaultMessageCrypto) getKeyAndDecryptData(msgMetadata MessageMetadataS return decryptedData, nil } } else { - // First time, entry wont be present in cache + // First time, entry won't be present in cache d.logger.Debugf("%s Failed to decrypt data or data key is not in cache. Will attempt to refresh", d.logCtx) } } diff --git a/pulsar/crypto/encryption_key_Info.go b/pulsar/crypto/encryption_key_Info.go index 8418682..11dd444 100644 --- a/pulsar/crypto/encryption_key_Info.go +++ b/pulsar/crypto/encryption_key_Info.go @@ -24,7 +24,7 @@ type EncryptionKeyInfo struct { name string } -// NewEncryptionKeyInfo +// NewEncryptionKeyInfo create a new EncryptionKeyInfo func NewEncryptionKeyInfo(name string, key []byte, metadata map[string]string) *EncryptionKeyInfo { return &EncryptionKeyInfo{ metadata: metadata, @@ -33,17 +33,17 @@ func NewEncryptionKeyInfo(name string, key []byte, metadata map[string]string) * } } -// GetKey get key +// Name get the name of the key func (eci *EncryptionKeyInfo) Name() string { return eci.name } -// GetValue get value +// Key get the key data func (eci *EncryptionKeyInfo) Key() []byte {
[pulsar-client-go] branch master updated: Fix logic of command for sendError (#622)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 791d342 Fix logic of command for sendError (#622) 791d342 is described below commit 791d342a98d0f0b4189913a5ea61547964d095c8 Author: xiaolong ran AuthorDate: Mon Oct 11 11:20:33 2021 +0800 Fix logic of command for sendError (#622) ### Motivation ![image](https://user-images.githubusercontent.com/20965307/135020293-06cb72cc-5ed9-4bc5-a7ba-3909da57a8a6.png) As shown in the figure above, the `ServerError` returned by the broker is `UnknownError` when the client receives it. In fact, we handled the wrong command here. Here we should deal with `CommandSendError` instead of `CommandError`. Correspondingly, we should deal with the `listener` map used to cache the producer instead of the corresponding `pendingRequest` map. --- pulsar/internal/connection.go | 45 +++ 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 0313e4e..163dcac 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -531,7 +531,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl c.handleResponseError(cmd.GetError()) case pb.BaseCommand_SEND_ERROR: - c.handleSendError(cmd.GetError()) + c.handleSendError(cmd.GetSendError()) case pb.BaseCommand_CLOSE_PRODUCER: c.handleCloseProducer(cmd.GetCloseProducer()) @@ -752,31 +752,29 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge) c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse)) } -func (c *connection) handleSendError(cmdError *pb.CommandError) { - c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage()) +func (c *connection) handleSendError(sendError *pb.CommandSendError) { + c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage()) - requestID := cmdError.GetRequestId() + producerID := sendError.GetProducerId() - switch cmdError.GetError() { + switch sendError.GetError() { case pb.ServerError_NotAllowedError: - request, ok := c.deletePendingRequest(requestID) + _, ok := c.deletePendingProducers(producerID) if !ok { c.log.Warnf("Received unexpected error response for request %d of type %s", - requestID, cmdError.GetError()) + producerID, sendError.GetError()) return } - errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage()) - request.callback(nil, errors.New(errMsg)) + c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage()) case pb.ServerError_TopicTerminatedError: - request, ok := c.deletePendingRequest(requestID) + _, ok := c.deletePendingProducers(producerID) if !ok { - c.log.Warnf("Received unexpected error response for request %d of type %s", - requestID, cmdError.GetError()) + c.log.Warnf("Received unexpected error response for producer %d of type %s", + producerID, sendError.GetError()) return } - errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage()) - request.callback(nil, errors.New(errMsg)) + c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage()) default: // By default, for transient error, let the reconnection logic // to take place and re-establish the produce again @@ -784,6 +782,17 @@ func (c *connection) handleSendError(cmdError *pb.CommandError) { } } +func (c *connection) deletePendingProducers(producerID uint64) (ConnectionListener, bool) { + c.listenersLock.Lock() + producer, ok := c.listeners[producerID] + if ok { + delete(c.listeners, producerID) + } + c.listenersLock.Unlock() + + return producer, ok +} + func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer) { consumerID := closeConsumer.GetConsumerId() c.log.Infof("Broker notification of Closed consumer: %d", consumerID) @
[pulsar-client-go] branch master updated: User-defined metric cardinality (#604)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ce79489 User-defined metric cardinality (#604) ce79489 is described below commit ce794898bce8d0799e143b42833167252a3b6cd0 Author: Andy Walker AuthorDate: Sat Oct 9 05:03:58 2021 -0400 User-defined metric cardinality (#604) * initial commit * forgot "none" * Satisfy the lint gods Co-authored-by: xiaolongran --- go.mod | 2 + go.sum | 2 - pulsar/client.go | 15 ++ pulsar/client_impl.go | 8 ++- pulsar/consumer_impl.go| 4 +- pulsar/consumer_partition.go | 4 +- pulsar/consumer_partition_test.go | 6 +-- pulsar/internal/lookup_service_test.go | 29 ++- pulsar/internal/metrics.go | 95 +- pulsar/producer_impl.go| 4 +- pulsar/producer_partition.go | 4 +- pulsar/reader_impl.go | 4 +- 12 files changed, 111 insertions(+), 66 deletions(-) diff --git a/go.mod b/go.mod index 354f5b4..af7c571 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,8 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.10.8 github.com/linkedin/goavro/v2 v2.9.8 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 8b372c7..85fba89 100644 --- a/go.sum +++ b/go.sum @@ -29,7 +29,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= @@ -159,7 +158,6 @@ github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= diff --git a/pulsar/client.go b/pulsar/client.go index cc6fb3c..8ff152a 100644 --- a/pulsar/client.go +++ b/pulsar/client.go @@ -117,6 +117,10 @@ type ClientOptions struct { // FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic Logger log.Logger + // Specify metric cardinality to the tenant, namespace or topic levels, or remove it completely. + // Default: MetricsCardinalityNamespace + MetricsCardinality MetricsCardinality + // Add custom labels to all the metrics reported by this client instance CustomMetricsLabels map[string]string } @@ -150,3 +154,14 @@ type Client interface { // Close Closes the Client and free associated resources Close() } + +// MetricsCardinality represents the specificty of labels on a per-metric basis +type MetricsCardinality int + +const ( + _ MetricsCardinality = iota + MetricsCardinalityNone // Do not add additional labels to metrics + MetricsCardinalityTenant // Label metrics by tenant + MetricsCardinalityNamespace// Label metrics by tenant and namespace + MetricsCardinalityTopic// Label metrics by topic +) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7d2fcfd..5682927 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -107,11 +107,15 @@ func newClient(options ClientOptions)
[pulsar-client-go] branch master updated: Encryption support ext consumer (#612)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ab96ad7 Encryption support ext consumer (#612) ab96ad7 is described below commit ab96ad7d84b7c53e3e6d241f68c24eb7d6fa0037 Author: Garule Prabhudas AuthorDate: Sat Oct 9 14:14:53 2021 +0530 Encryption support ext consumer (#612) * add ability to encrypt messages - use base crypto package for encryption * fix typo * lint fixes * address review suggestions * revert go mod * remove encryption context - move it to Consumer MR * try to fix check issues * remove unused code * consumer encryption/decryption changes * remove embedded crypto struct * remove embedded struct * add comments * merge conflict issues fix * add noop decryptor * lint fixes * fix test case * refactor and reader encryption changes * refactor - move decryptor creation to partition_producer.go - update reader_impl - update consumer_impl * address review feedback * Nack on decryption failure * add comment on test case Co-authored-by: PGarule --- pulsar/consumer.go | 3 + pulsar/consumer_impl.go| 1 + pulsar/consumer_partition.go | 108 ++- pulsar/consumer_partition_test.go | 4 + pulsar/consumer_test.go| 847 - pulsar/encryption.go | 12 + pulsar/impl_message.go | 23 + pulsar/internal/crypto/consumer_decryptor.go | 60 ++ .../crypto/decryptor.go} | 23 +- .../crypto/noop_decryptor.go} | 30 +- .../pulsartracing/message_carrier_util_test.go | 4 + pulsar/message.go | 4 + pulsar/reader.go | 3 + pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 56 ++ 15 files changed, 1148 insertions(+), 31 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 1c52b29..c9fbc0d 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -155,6 +155,9 @@ type ConsumerOptions struct { // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint + + // Decryption decryption related fields to decrypt the encrypted message + Decryption *MessageDecryptionInfo } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6677062..232079b 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -335,6 +335,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { maxReconnectToBroker: c.options.MaxReconnectToBroker, keySharedPolicy: c.options.KeySharedPolicy, schema: c.options.Schema, + decryption: c.options.Decryption, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 5f74bcf..e691d14 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -26,8 +26,10 @@ import ( "github.com/gogo/protobuf/proto" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + cryptointernal "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -98,6 +100,7 @@ type partitionConsumerOpts struct { maxReconnectToBroker *uint keySharedPolicy*KeySharedPolicy schema Schema + decryption *MessageDecryptionInfo } type partitionConsumer struct { @@ -142,6 +145,7 @@ type partitionConsumer struct { providersMutex sync.RWMutex compressionProviders map[pb.CompressionType]compression.Provider metrics *internal.TopicMetrics + decryptorcryptointernal.Decryptor } func newPart
[pulsar-client-go] branch master updated: [Issue 615] Fix retry on specified-partition topic (#616)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new e9629fb [Issue 615] Fix retry on specified-partition topic (#616) e9629fb is described below commit e9629fb7851a078c8746940b45134b136c84a0a6 Author: Minzhang AuthorDate: Sat Oct 9 16:43:43 2021 +0800 [Issue 615] Fix retry on specified-partition topic (#616) * fix retry on consume specified-partition topic (#615) * introduce variable for topic name Co-authored-by: xiaolongran --- pulsar/consumer_multitopic.go | 11 +++-- pulsar/consumer_test.go | 101 ++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index faf8917..f689fae 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -149,11 +149,16 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) { return } - fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0]) + tn := names[0] + fqdnTopic := internal.TopicNameWithoutPartitionPart(tn) consumer, ok := c.consumers[fqdnTopic] if !ok { - c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic()) - return + // check to see if the topic with the partition part is in the consumers + // this can happen when the consumer is configured to consume from a specific partition + if consumer, ok = c.consumers[tn.Name]; !ok { + c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic()) + return + } } consumer.ReconsumeLater(msg, delay) } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 1811722..66587f9 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1353,6 +1353,107 @@ func TestRLQMultiTopics(t *testing.T) { assert.Nil(t, checkMsg) } +func TestRLQSpecifiedPartitionTopic(t *testing.T) { + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "1") + + normalTopic := "persistent://public/default/" + topic + partitionTopic := normalTopic + "-partition-0" + + subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + maxRedeliveries := 2 + N := 100 + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + // subscribe topic with partition + rlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: partitionTopic, + SubscriptionName:subName, + Type:Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)}, + RetryEnable: true, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + defer rlqConsumer.Close() + + // subscribe DLQ Topic + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: subName + "-DLQ", + SubscriptionName:subName, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{Topic: normalTopic}) + assert.Nil(t, err) + defer producer.Close() + + // 1. Pre-produce N messages + for i := 0; i < N; i++ { + _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))}) + assert.Nil(t, err) + } + + // 2. Create consumer on the Retry Topics to reconsume N messages (maxRedeliveries+1) times + rlqReceived := 0 + for rlqReceived < N*(maxRedeliveries+1) { + msg, err := rlqConsumer.Receive(ctx) + assert.Nil(t, err) + rlqConsumer.ReconsumeLater(msg, 1*time.Second) + rlqReceived++ + } + fmt.Println("retry consumed:", rlqReceived) // 300 + + // No more messages on the Retry Topic + rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer rlqCancel() + msg, err := rlqConsumer.
[pulsar-client-go] branch master updated (d336ff7 -> 44d6b16)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. from d336ff7 Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620) add 44d6b16 Increment ConsumersOpened counter on each new consumer instance. (#619) No new revisions were added by this update. Summary of changes: pulsar/consumer_impl.go | 12 ++-- 1 file changed, 2 insertions(+), 10 deletions(-)
[pulsar-client-go] branch master updated: Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new d336ff7 Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620) d336ff7 is described below commit d336ff717c98b30d976dd1ebf9b1acf76e05c695 Author: PowerStateFailure <29687050+powerstatefail...@users.noreply.github.com> AuthorDate: Sat Oct 9 12:30:57 2021 +0500 Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620) This change fixes memory leak when frequently using short-living regexp- or multitopic consumers because there were not removed from client handler on Close Co-authored-by: xiaolongran --- pulsar/consumer_multitopic.go | 4 pulsar/consumer_regex.go | 1 + pulsar/reader_impl.go | 3 +++ 3 files changed, 8 insertions(+) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index dc4ad7b..faf8917 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -30,6 +30,8 @@ import ( ) type multiTopicConsumer struct { + client *client + options ConsumerOptions consumerName string @@ -48,6 +50,7 @@ type multiTopicConsumer struct { func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string, messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) { mtc := &multiTopicConsumer{ + client: client, options: options, messageCh:messageCh, consumers:make(map[string]Consumer, len(topics)), @@ -186,6 +189,7 @@ func (c *multiTopicConsumer) Close() { } wg.Wait() close(c.closeCh) + c.client.handlers.Del(c) c.dlq.close() c.rlq.close() }) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 9e0c125..2f46c48 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -217,6 +217,7 @@ func (c *regexConsumer) Close() { }(con) } wg.Wait() + c.client.handlers.Del(c) c.dlq.close() c.rlq.close() }) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index a019d9c..9983286 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -33,6 +33,7 @@ const ( type reader struct { sync.Mutex + client *client pc *partitionConsumer messageCh chan ConsumerMessage lastMessageInBroker trackingMessageID @@ -91,6 +92,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } reader := &reader{ + client:client, messageCh: make(chan ConsumerMessage), log: client.log.SubLogger(log.Fields{"topic": options.Topic}), metrics: client.metrics.GetTopicMetrics(options.Topic), @@ -174,6 +176,7 @@ func (r *reader) hasMoreMessages() bool { func (r *reader) Close() { r.pc.Close() + r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() }
[pulsar-client-go] branch master updated: fix TestNamespaceTopicsNamespaceDoesNotExit (#625)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new b0f328e fix TestNamespaceTopicsNamespaceDoesNotExit (#625) b0f328e is described below commit b0f328e358d45e822efac164b11dc493bbecfcaa Author: Rui Fu AuthorDate: Tue Sep 28 22:31:38 2021 +0800 fix TestNamespaceTopicsNamespaceDoesNotExit (#625) fix TestNamespaceTopicsNamespaceDoesNotExit error after pulsar release 2.8.1 ref: apache/pulsar#11172 --- pulsar/client_impl_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go index b1d128a..e9e94af 100644 --- a/pulsar/client_impl_test.go +++ b/pulsar/client_impl_test.go @@ -383,7 +383,7 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) { // fetch from namespace that does not exist name := generateRandomName() - topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent) + topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), internal.Persistent) assert.Nil(t, err) assert.Equal(t, 0, len(topics)) } @@ -401,7 +401,7 @@ func TestNamespaceTopicsNamespaceDoesNotExitWebURL(t *testing.T) { // fetch from namespace that does not exist name := generateRandomName() - topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), internal.Persistent) + topics, err := ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), internal.Persistent) assert.NotNil(t, err) assert.Equal(t, 0, len(topics)) }
[pulsar] branch master updated (b079c1e -> f784379)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from b079c1e [Schema] Schema compatibility strategy in broker level. (#11856) add f784379 [pulsar-functions-go] sync to the latest function proto (#11853) No new revisions were added by this update. Summary of changes: pulsar-function-go/pb/Function.pb.go | 469 +++--- pulsar-function-go/pb/InstanceCommunication.pb.go | 68 ++-- pulsar-function-go/pb/Request.pb.go | 2 +- pulsar-function-go/pb/doc.go | 4 +- 4 files changed, 271 insertions(+), 272 deletions(-)
[pulsar-client-go] branch master updated: fix: retry assertion (#599)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 6837964 fix: retry assertion (#599) 6837964 is described below commit 6837964a473b29a1fb9ca7a264fdf44fa7d376dc Author: cimura <35636173+cim...@users.noreply.github.com> AuthorDate: Mon Aug 30 11:48:42 2021 +0900 fix: retry assertion (#599) Co-authored-by: kimura ### Motivation in `consumer_regex_test.go`, sometimes consumers take more than 2 seconds (waiting time) subscribing, and assertions fail. ### Modifications instead of waiting and asserting just once, repeat it a specified number of times so that consumers have enough time to subscribe. --- pulsar/consumer_regex_test.go | 27 +++ pulsar/test_helper.go | 12 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 228fc2d..9cb600f 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -177,10 +177,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string t.Fatal(err) } rc.discover() - time.Sleep(2000 * time.Millisecond) - - consumers = cloneConsumers(rc) - assert.Equal(t, 1, len(consumers)) + retryAssert(t, 5, 2000, func() { + consumers = cloneConsumers(rc) + }, func(x assert.TestingT) bool { + return assert.Equal(x, 1, len(consumers)) + }) } func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string) { @@ -216,10 +217,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string defer deleteTopic(myTopic) rc.discover() - time.Sleep(2000 * time.Millisecond) - - consumers = cloneConsumers(rc) - assert.Equal(t, 0, len(consumers)) + retryAssert(t, 5, 2000, func() { + consumers = cloneConsumers(rc) + }, func(x assert.TestingT) bool { + return assert.Equal(x, 0, len(consumers)) + }) // create a topic not in the regex fooTopic := namespace + "/foo-topic" @@ -229,10 +231,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string } rc.discover() - time.Sleep(2000 * time.Millisecond) - - consumers = cloneConsumers(rc) - assert.Equal(t, 1, len(consumers)) + retryAssert(t, 5, 2000, func() { + consumers = cloneConsumers(rc) + }, func(x assert.TestingT) bool { + return assert.Equal(x, 1, len(consumers)) + }) } func TestRegexConsumer(t *testing.T) { diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go index 273169a..d6a4f00 100644 --- a/pulsar/test_helper.go +++ b/pulsar/test_helper.go @@ -30,6 +30,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/internal" + "github.com/stretchr/testify/assert" pkgerrors "github.com/pkg/errors" ) @@ -165,3 +166,14 @@ func topicPath(topic string) string { } return tn.Name } + +func retryAssert(t assert.TestingT, times int, milliseconds int, update func(), assert func(assert.TestingT) bool) { + for i := 0; i < times; i++ { + time.Sleep(time.Duration(milliseconds) * time.Millisecond) + update() + if assert(nil) { + break + } + } + assert(t) +}
[pulsar-client-go] branch master updated: Fix panic when scale down partitions (#601)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new b684151 Fix panic when scale down partitions (#601) b684151 is described below commit b6841513379ea9ca503d1e350c5f93198fc2b03f Author: xiaolong ran AuthorDate: Thu Aug 26 16:51:20 2021 +0800 Fix panic when scale down partitions (#601) Signed-off-by: xiaolongran ### Motivation When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered: ``` level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxx//g" ``` ``` panic: runtime error: index out of range [1] with length 1 goroutine 166288 [running]: github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0) github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785 github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0) github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd ``` ### Modifications Increase the processing logic of scale down partition --- pulsar/consumer_impl.go | 30 ++ pulsar/producer_impl.go | 27 +++ 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index ec7ad7d..78ae0d7 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -28,7 +28,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" - "github.com/pkg/errors" ) const defaultNackRedeliveryDelay = 1 * time.Minute @@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.Lock() defer c.Unlock() + oldConsumers := c.consumers + oldNumPartitions = len(oldConsumers) if oldConsumers != nil { - oldNumPartitions = len(oldConsumers) if oldNumPartitions == newNumPartitions { c.log.Debug("Number of partitions in topic has not changed") return nil } - if oldNumPartitions > newNumPartitions { - c.log.WithField("old_partitions", oldNumPartitions). - WithField("new_partitions", newNumPartitions). - Error("Does not support scaling down operations on topic partitions") - return errors.New("Does not support scaling down operations on topic partitions") - } - c.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") @@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.consumers = make([]*partitionConsumer, newNumPartitions) - if oldConsumers != nil { + // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, + // we need to rebuild the cache of new consumers, otherwise the array will be out of bounds. + if oldConsumers != nil && oldNumPartitions < newNumPartitions { // Copy over the existing consumer instances for i := 0; i < oldNumPartitions; i++ { c.consumers[i] = oldConsumers[i] @@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { receiverQueueSize := c.options.ReceiverQueueSize metadata := c.options.Properties + startPartition := oldNumPartitions partitionsToAdd := newNumPartitions - oldNumPartitions + + if partitionsToAdd < 0 { + partitionsToAdd = newNumPartitions
[pulsar-client-go] branch master updated: Fix producer panic by oldProducers (#598)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new baaf68d Fix producer panic by oldProducers (#598) baaf68d is described below commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f Author: xiaolong ran AuthorDate: Wed Aug 25 14:54:31 2021 +0800 Fix producer panic by oldProducers (#598) * Fix producer panic by oldProducers Signed-off-by: xiaolongran * fix comments Signed-off-by: xiaolongran * fix comments Signed-off-by: xiaolongran * fix ci error Signed-off-by: xiaolongran --- pulsar/consumer_impl.go | 16 +--- pulsar/producer_impl.go | 17 ++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index b7bc607..ec7ad7d 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -28,6 +28,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/pkg/errors" ) const defaultNackRedeliveryDelay = 1 * time.Minute @@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { return nil } + if oldNumPartitions > newNumPartitions { + c.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Error("Does not support scaling down operations on topic partitions") + return errors.New("Does not support scaling down operations on topic partitions") + } + c.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") @@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { c.consumers = make([]*partitionConsumer, newNumPartitions) - // Copy over the existing consumer instances - for i := 0; i < oldNumPartitions; i++ { - c.consumers[i] = oldConsumers[i] + if oldConsumers != nil { + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + c.consumers[i] = oldConsumers[i] + } } type ConsumerError struct { diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 1ffd24c..adf9b14 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -26,6 +26,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/pkg/errors" ) const ( @@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers() error { return nil } + if oldNumPartitions > newNumPartitions { + p.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Error("Does not support scaling down operations on topic partitions") + return errors.New("Does not support scaling down operations on topic partitions") + } + p.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") + } p.producers = make([]Producer, newNumPartitions) - // Copy over the existing consumer instances - for i := 0; i < oldNumPartitions; i++ { - p.producers[i] = oldProducers[i] + if oldProducers != nil { + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + p.producers[i] = oldProducers[i] + } } type ProducerError struct {
[pulsar] branch master updated (9577b84 -> b8dce10)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 9577b84 [Issue 11496][C++] Allow partitioned producers to start lazily (#11570) add b8dce10 Replace bookkeper with bookkeeper in bin/bookkeeper (#11675) No new revisions were added by this update. Summary of changes: bin/bookkeeper| 2 +- .../java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-)
[pulsar] branch master updated (191dc62 -> 720214c)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 191dc62 Deep copy the tenants to avoid concurrent sort exception (#11463) add 720214c [Go Functions] Upgrade go client version to 0.6.0 (#11477) No new revisions were added by this update. Summary of changes: pulsar-function-go/go.mod | 5 +++-- pulsar-function-go/go.sum | 13 + pulsar-function-go/pf/mockMessage_test.go | 16 3 files changed, 32 insertions(+), 2 deletions(-)
[pulsar-client-go] branch master updated: [issue 574] Fix handle send error panic (#576)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 29414db [issue 574] Fix handle send error panic (#576) 29414db is described below commit 29414db801a747672e6c0de62d0498d664e70d41 Author: cckellogg AuthorDate: Tue Jul 27 23:25:40 2021 -0700 [issue 574] Fix handle send error panic (#576) Fix handle send error panic --- pulsar/internal/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 9873ec8..147de3f 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -752,7 +752,7 @@ func (c *connection) handleSendError(cmdError *pb.CommandError) { requestID := cmdError.GetRequestId() - switch *cmdError.Error { + switch cmdError.GetError() { case pb.ServerError_NotAllowedError: request, ok := c.deletePendingRequest(requestID) if !ok {
[pulsar-client-go] annotated tag v0.6.0 updated (5e88b01 -> 6d3fa80)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.6.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. *** WARNING: tag v0.6.0 was modified! *** from 5e88b01 (commit) to 6d3fa80 (tag) tagging 5e88b015e9aa2c3fd993208c35e47cf1852be07d (commit) replaces v0.4.0 by xiaolongran on Mon Jul 26 10:54:43 2021 +0800 - Log - Release v0.6.0 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmD+I/MPHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjmdOkP/RGinh47MM7iqdTJSM48tQrPjedoCCvg56qM EraH2YoQnJeZicuOcA+PMr6aeEs7M1gwwv76oBOrSYwK6V6eLbf8Oy8xJk5W/Y/Q UcyEAD1TBbC3ecd4COmcUsIlNkMcddhblwmoHYH5iSWifxYvw6aJj5Qvbwfp711I 15AR9l6QStOyqDAHfFnRSYCAbIHKrDH33lKsOsBh/RtncWoAvN1RtEl6oa/3iI3L 58+KexNGpT8snSetwW3dX0RdWHtLxuL0Q3O+gLtU6zi/+hn7nDs4u12IXbWZQKEr btUDhQwvXXTshr2lD7G0m5GJg4ofqq3XUnV27sgokiui6vTuTpuzgj9RApxuL/Qx nXJ0O44gejdDckJgEHPU1E5W13DQmwKQGa7+pZ9oUjd5j9PKc/VwGIP/N29xw+jP mh5EC+OAKmMZweMhjdtex0667leuyxAV4aS1W7xzLsdb4iGdBiaheioD/VM65olP ZUPe8Eh0K0imJTaesVnHEznanzjh7lidUera2VZRq3CcMgqIhIi173eTPDrSl1Nn fEckeWoxHXB+97JteaexzJahGuLDkg4CGa9StuS4Xl6WLDk8uUsQQkoQPvn4qXOE y0l056/pR8ZTomx6Z/RikFkHPnF3lSg6NzyFKdq2VlOcXxzoE7zppyvW6W0x8Veo LTypzQAL =q65A -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
[pulsar-client-go] branch master updated: Update change log for 0.6.0 release (#570)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1cd38b3 Update change log for 0.6.0 release (#570) 1cd38b3 is described below commit 1cd38b3c96661ed2c61fc55ea56c217a016c0f83 Author: xiaolong ran AuthorDate: Mon Jul 26 10:52:47 2021 +0800 Update change log for 0.6.0 release (#570) Signed-off-by: xiaolongran Update change log for 0.6.0 release --- CHANGELOG.md | 37 + 1 file changed, 37 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64b901f..ce91e22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,43 @@ All notable changes to this project will be documented in this file. +[0.6.0] 2021-07-21 + +## Feature + +* Make PartitionsAutoDiscoveryInterval configurable, see [PR-514](https://github.com/apache/pulsar-client-go/pull/514). +* Always check connection close channell, before attempting to put requests, see [PR-521](https://github.com/apache/pulsar-client-go/pull/521). +* Add `LedgerId,EntryId,BatchIdx,PartitionIdx` func for MessageId interface, see [PR-529](https://github.com/apache/pulsar-client-go/pull/529). +* Add DisableReplication to Producer Message, see [PR-543](https://github.com/apache/pulsar-client-go/pull/543). +* Updating comments to conform to golang comment specification, see [PR-532](https://github.com/apache/pulsar-client-go/pull/532). +* Producer respects Context passed to Send() and SendAsync() when applying backpressure, see [PR-534](https://github.com/apache/pulsar-client-go/pull/534). +* Simplify connection close logic, see [PR-559](https://github.com/apache/pulsar-client-go/pull/559). +* Add open tracing to pulsar go client, see [PR-518](https://github.com/apache/pulsar-client-go/pull/518). +* Update proto file, see [PR-562](https://github.com/apache/pulsar-client-go/pull/562). +* Add send error logic for connection, see [PR-566](https://github.com/apache/pulsar-client-go/pull/566). +* Add license file for depend on libs, see [PR-567](https://github.com/apache/pulsar-client-go/pull/567). + +## Improve + +* Update jwt-go dependency to resolve vulnerabilities, see [PR-524](https://github.com/apache/pulsar-client-go/pull/524). +* Fixed Athenz repository name, see [PR-522](https://github.com/apache/pulsar-client-go/pull/522). +* Fix reader latest position, see [PR-525](https://github.com/apache/pulsar-client-go/pull/525). +* Fix timeout guarantee for RequestOnCnx, see [PR-492](https://github.com/apache/pulsar-client-go/pull/492). +* Fix nil pointer error with GetPartitionedTopicMetadata, see [PR-536](https://github.com/apache/pulsar-client-go/pull/536). +* Release locks before calling producer consumer response callbacks, see [PR-542](https://github.com/apache/pulsar-client-go/pull/542). +* Fix lookup service not implemented GetTopicsOfNamespace, see [PR-541](https://github.com/apache/pulsar-client-go/pull/541). +* Regenerate the certs to work with Pulsar 2.8.0 and Java 11, see [PR-548](https://github.com/apache/pulsar-client-go/pull/548). +* Fix race condition when resend pendingItems, see [PR-551](https://github.com/apache/pulsar-client-go/pull/551). +* Fix data race while accessing connection in partitionConsumer, see [PR-535](https://github.com/apache/pulsar-client-go/pull/535). +* Fix channel data race, see [PR-558](https://github.com/apache/pulsar-client-go/pull/558). +* Fix write to closed channel panic() in internal/connection during connection close, see [PR-539](https://github.com/apache/pulsar-client-go/pull/539). +* Fix possible race condition in connection pool, see [PR-561](https://github.com/apache/pulsar-client-go/pull/561). +* Fix default connection timeout, see [PR-563](https://github.com/apache/pulsar-client-go/pull/563). +* Add lock for compressionProviders to fix data race problem, see [PR-533](https://github.com/apache/pulsar-client-go/pull/533). +* Fix send goroutine blocked, see [PR-530](https://github.com/apache/pulsar-client-go/pull/530). + + + [0.5.0] 2021-05-14 ## Feature
[pulsar-client-go] branch master updated: update version to 0.6.0 (#572)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 5e37108 update version to 0.6.0 (#572) 5e37108 is described below commit 5e37108cd168e424961dd374afa1ef2dace29f15 Author: xiaolong ran AuthorDate: Mon Jul 26 10:52:30 2021 +0800 update version to 0.6.0 (#572) Signed-off-by: xiaolongran update version to 0.6.0 --- VERSION| 2 +- stable.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 8d1f998..cf25392 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.5.0 +v0.6.0 diff --git a/stable.txt b/stable.txt index 3270adc..71c317e 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.5.0 +v0.6.0
[pulsar-client-go] branch master updated: [issue 490] Add error log when schema encode failed. (#571)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new bbee640 [issue 490] Add error log when schema encode failed. (#571) bbee640 is described below commit bbee6401ac34ae1d8ca5f08e8990418c69aa52e5 Author: Zhiqiang Li AuthorDate: Wed Jul 21 18:41:03 2021 +0800 [issue 490] Add error log when schema encode failed. (#571) * Add error log when schema encode failed. * format code --- pulsar/producer_partition.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7e83bfa..abec4fc 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -356,6 +356,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if p.options.Schema != nil { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { + p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } }
[pulsar-client-go] branch master updated: Add producer check state before send msg. (#569)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1dfb8fd Add producer check state before send msg. (#569) 1dfb8fd is described below commit 1dfb8fdffb97d0c4f4845cc668bed485324daf5c Author: Zhiqiang Li AuthorDate: Wed Jul 21 17:43:23 2021 +0800 Add producer check state before send msg. (#569) Add producer state check before send msg. --- pulsar/error.go | 4 pulsar/producer_partition.go | 7 +++ pulsar/producer_test.go | 30 ++ 3 files changed, 41 insertions(+) diff --git a/pulsar/error.go b/pulsar/error.go index 60a832b..f433bfc 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -99,6 +99,8 @@ const ( AddToBatchFailed // SeekFailed seek failed SeekFailed + // ProducerClosed means producer already been closed + ProducerClosed ) // Error implement error interface, composed of two parts: msg and result. @@ -201,6 +203,8 @@ func getResultStr(r Result) string { return "AddToBatchFailed" case SeekFailed: return "SeekFailed" + case ProducerClosed: + return "ProducerClosed" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8b3d33d..7e83bfa 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -50,6 +50,7 @@ var ( errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full") errContextExpired = newError(TimeoutError, "message send context expired") errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize") + errProducerClosed = newError(ProducerClosed, "producer already been closed") buffersPool sync.Pool ) @@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { + if p.getProducerState() != producerReady { + // Producer is closing + callback(nil, msg, errProducerClosed) + return + } + sr := &sendRequest{ ctx: ctx, msg: msg, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 7c3dbd7..bbe8028 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Equal(t, 10, metric.sendn) assert.Equal(t, 10, metric.ackn) } + +func TestProducerSendAfterClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + + assert.NoError(t, err) + assert.NotNil(t, ID) + + producer.Close() + ID, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, ID) + assert.Error(t, err) +}
[pulsar-client-go] branch branch-0.6.0 created (now 5e88b01)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to branch branch-0.6.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. at 5e88b01 Add license file for depend libs (#567) No new revisions were added by this update.
[pulsar-client-go] annotated tag v0.6.0-candidate-1 updated (5e88b01 -> ad15726)
This is an automated email from the ASF dual-hosted git repository. rxl pushed a change to annotated tag v0.6.0-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git. *** WARNING: tag v0.6.0-candidate-1 was modified! *** from 5e88b01 (commit) to ad15726 (tag) tagging 5e88b015e9aa2c3fd993208c35e47cf1852be07d (commit) replaces v0.4.0 by xiaolongran on Wed Jul 21 14:43:51 2021 +0800 - Log - Release v0.6.0-candidate-1 -BEGIN PGP SIGNATURE- iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmD3wicPHHJ4bEBhcGFj aGUub3JnAAoJEAB3zDtNATjmOaYQAMqI62fJ11dinzy8dQdN27KhpEV2Q/dg3XuB /eNrMyCQ5xxmb4b0vjVqj8x4OXB/eLKqYq57ntyIsp6G+rJWoP0Ca/ez+hMdwBDj RFfCrCUAV9vTQZMno+dHFgwJJ+Q6FSlSHiqELAZ0kfS2IpWCHgPz29Xeltl0swLz 7mPEBIj5ORl9bzQ9oEMbAUPJycSv/LfUWUKdj2eQR3JSp96uHgXbY15gDUVTBDAI sTC3dwD9vduFak0bCnhP3mvqHz7bS4JCSTP9Rvuo0G/j24Iz4xxtoTSGAsUFXReV DFISSttjTwiXRc9QeQ/ZcY/uADre9QiYJqg26cfjRFYA2G54y39klKLCCCiUqefq yqxSFdvF2XMkEEqjUkroiVHf/+rYF9XfVaOf4WaJ2xRHoFb0bipBFcE4AXJDhzV1 t/xg+OcNBgoLFr2zNXGVO7fip+ocRTnIkOhW7Uae2arztRJ7fnNr63qRPVqzmbGE b6UPojT9aufFMZSmxAML67Ct7FeLFAGRNa3dYT1+24lfRvUlcX6ARFIH8iRFiwvI IrWvgN35xoiEX1mrX//OXmlAd5speuKUa7Z8gZzvLAB4oeZA2zmOJASbV4DO0TcI ADhX1i8CbGENX+3WlrFqgAmdMMXdbGT5w2Gx37Zu36is7Xez6d96m1WV8Exn6/NI d80e/H77 =4XD9 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r48918 - in /dev/pulsar/pulsar-client-go-0.6.0-candidate-1: ./ apache-pulsar-client-go-0.6.0-src.tar.gz apache-pulsar-client-go-0.6.0-src.tar.gz.asc apache-pulsar-client-go-0.6.0-src.tar.g
Author: rxl Date: Wed Jul 21 07:20:40 2021 New Revision: 48918 Log: Staging artifacts and signature for Pulsar Client Go release 0.6.0-candidate-1 Added: dev/pulsar/pulsar-client-go-0.6.0-candidate-1/ dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc Wed Jul 21 07:20:40 2021 @@ -0,0 +1,17 @@ +-BEGIN PGP SIGNATURE- + +iQJEBAABCAAuFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmD3yK0QHHlvbmdAYXBh +Y2hlLm9yZwAKCRD6r5PcjpChGHuBD/wN8eTxHrkFYGALUFgx6USucpOl5q0L8gxR +X7NbOzi3VcXWBrF86AYVtRrUYeRw+nsHX6118PVgoG7HNG7jvVXE8JiqJFSXtCri +hkl1zAH5FtOF9nh/k59QwH5fkjJ19vTH/k35MSCmV5VFTmSwlaz03tSYSCZECyNn +IMPZraiJ2DoDEai/de/z7iE6DkYxoi9h7fOnkbRgu/eiqIgVTvvMcuPmgsapRZ2m +B8wiJIjCcIfYURX5wVBIpaDAbsiwqQC0jAerGw26hxD3Wm+9WTBfnXHwPC71rmQ4 +oRl4Wx5/ztFMFxDuaOb3ECWzKRpH0Hgrbad0dEPqTfdF6ZKlYfEqarGDjbGzayzY +Ayt1IgUy9Yq+73auR3Ig0MdD8402b3lkzD20UsmCtbAmC4uM71poNCcE4i0WXCE9 +vD3QydyxNA6eXcCPs1fxccBMqLb1wv7jUq/ckbpW8tn4cpQU6Jft3eZvIN2CfsBd +hGuEgSc4KpH5bHgFgXmxhbRrK7T/TA1JsBSNbvA3mzWDGGNQxWZwZRWLWzrfB/3E +0xi8wzyjZBRPfdzZQGyBLZT0c3FKI02p1mBlJY1UaqnNT4Q+2gyydoSXJlCXO2Dx +7UQjPd++njxoCNgm0I856t6lmux0iSNCYsyrO1LKR81zInVlHF50UWh8U1s7+gA1 +qwaG5VdJYQ== +=7qLp +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512 Wed Jul 21 07:20:40 2021 @@ -0,0 +1 @@ +29e5a91dc77ce1873d93372e9e2b1c1863f66c0babd0a6152adaa3d55f431cff1f894865db6325f6610b4340211256ead5a52d4e7611706a97fae3d9e3b99651 apache-pulsar-client-go-0.6.0-src.tar.gz