[pulsar-client-go] branch master updated: Fix nack backoff policy logic (#974)

2023-03-02 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 48b7d01  Fix nack backoff policy logic (#974)
48b7d01 is described below

commit 48b7d0195327a81f638a3c51df2010d7eb244bbe
Author: xiaolong ran 
AuthorDate: Fri Mar 3 11:54:38 2023 +0800

Fix nack backoff policy logic (#974)

Signed-off-by: xiaolongran 

Motivation
Currently, the NackBackoffPolicy does not take effect, because in 
NackBackoffPolicy, we need to use Msg object for Nack, and MsgId cannot be used 
for Nack, otherwise the Msg redeliverCount field cannot be obtained for backoff 
retry.
---
 pulsar/consumer_impl.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index d16f719..7a86574 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -587,7 +587,7 @@ func (c *consumer) Nack(msg Message) {
}
 
if mid.consumer != nil {
-   mid.consumer.NackID(msg.ID())
+   mid.NackByMsg(msg)
return
}
c.consumers[mid.partitionIdx].NackMsg(msg)



[pulsar-client-go] branch master updated: add messageId and topic as props of DLQ message (#907)

2023-01-10 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new cf031b8  add messageId and topic as props of DLQ message (#907)
cf031b8 is described below

commit cf031b8951aec1356f2e064f1bf679d24c81ccd0
Author: Garule Prabhudas 
AuthorDate: Tue Jan 10 17:46:33 2023 +0530

add messageId and topic as props of DLQ message (#907)

Co-authored-by: Prabhudas Garule 
---
 pulsar/consumer_test.go |  6 ++
 pulsar/dlq_router.go| 12 +++-
 pulsar/message.go   |  3 +++
 pulsar/reader_test.go   |  8 
 pulsar/retry_router.go  |  1 +
 5 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 11e72a7..2f1a056 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1505,6 +1505,12 @@ func DLQWithProducerOptions(t *testing.T, prodOpt 
*ProducerOptions) {
 
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+   // check original messageId
+   assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
+
+   // check original topic
+   assert.NotEmpty(t, msg.Properties()[SysPropertyRealTopic])
}
 
// No more messages on the DLQ
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 000faaa..5ecd8f8 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -92,11 +92,21 @@ func (r *dlqRouter) run() {
producer := 
r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()
+
+   // properties associated with original message
+   properties := msg.Properties()
+
+   // include orinal message id in string format in 
properties
+   properties[PropertyOriginMessageID] = msgID.String()
+
+   // include original topic name of the message in 
properties
+   properties[SysPropertyRealTopic] = msg.Topic()
+
producer.SendAsync(context.Background(), 
&ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
OrderingKey: msg.OrderingKey(),
-   Properties:  msg.Properties(),
+   Properties:  properties,
EventTime:   msg.EventTime(),
ReplicationClusters: msg.replicationClusters,
}, func(MessageID, *ProducerMessage, error) {
diff --git a/pulsar/message.go b/pulsar/message.go
index 7f2e07f..76d0176 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -154,6 +154,9 @@ type MessageID interface {
 
// PartitionIdx returns the message partitionIdx
PartitionIdx() int32
+
+   // String returns message id in string format
+   String() string
 }
 
 // DeserializeMessageID reconstruct a MessageID object from its serialized 
representation
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 07e7ed3..53bd459 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -430,6 +430,14 @@ func (id *myMessageID) PartitionIdx() int32 {
return id.PartitionIdx()
 }
 
+func (id *myMessageID) String() string {
+   mid, err := DeserializeMessageID(id.data)
+   if err != nil {
+   return ""
+   }
+   return fmt.Sprintf("%d:%d:%d", mid.LedgerID(), mid.EntryID(), 
mid.PartitionIdx())
+}
+
 func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 7b5f6b8..75792ad 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -35,6 +35,7 @@ const (
SysPropertyRetryTopic  = "RETRY_TOPIC"
SysPropertyReconsumeTimes  = "RECONSUMETIMES"
SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+   PropertyOriginMessageID= "ORIGIN_MESSAGE_ID"
 )
 
 type RetryMessage struct {



[pulsar-client-go] branch master updated: fix: fix 923 (#924)

2022-12-22 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d3499a  fix: fix 923 (#924)
1d3499a is described below

commit 1d3499a18d526b4b1aef0bdbbc54ac812b8ae0c0
Author: Jiaqi Shen <18863662...@163.com>
AuthorDate: Fri Dec 23 15:13:04 2022 +0800

fix: fix 923 (#924)

Remove the outdated interface description of SeekByTime. More details here 
#923.
---
 pulsar/consumer.go | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 0a515e0..0e89fc0 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -267,9 +267,6 @@ type Consumer interface {
 
// SeekByTime resets the subscription associated with this consumer to 
a specific message publish time.
//
-   // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
-   // the individual partitions.
-   //
// @param time
//the message publish time when to reposition the 
subscription
//



[pulsar-client-go] branch master updated: feat: add BackoffPolicy to `reader` and improve test case (#889)

2022-11-10 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new edfb785  feat: add BackoffPolicy to `reader` and improve test case 
(#889)
edfb785 is described below

commit edfb785961ca1f27aabea790f5642e1eaca7bd44
Author: labuladong 
AuthorDate: Fri Nov 11 11:04:50 2022 +0800

feat: add BackoffPolicy to `reader` and improve test case (#889)

* feat: add BackoffPolicy to reader and improve test case.

* improve comments

* fix code style
---
 pulsar/consumer_test.go | 41 +++
 pulsar/producer_test.go | 40 ++
 pulsar/reader.go|  6 
 pulsar/reader_impl.go   |  1 +
 pulsar/reader_test.go   | 75 +
 5 files changed, 163 insertions(+)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f574378..e9b3013 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -3262,3 +3262,44 @@ func TestAvailablePermitsLeak(t *testing.T) {
assert.NotEqual(t, true, errors.Is(err, context.DeadlineExceeded),
"This means the resource is exhausted. consumer.Receive() will 
block forever.")
 }
+
+func TestConsumerWithBackoffPolicy(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: serviceURL,
+   })
+   assert.NoError(t, err)
+   defer client.Close()
+
+   topicName := newTopicName()
+
+   backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+   _consumer, err := client.Subscribe(ConsumerOptions{
+   Topic:topicName,
+   SubscriptionName: "sub-1",
+   Type: Shared,
+   BackoffPolicy:backoff,
+   })
+   assert.Nil(t, err)
+   defer _consumer.Close()
+
+   partitionConsumerImp := _consumer.(*consumer).consumers[0]
+   // 1 s
+   startTime := time.Now()
+   partitionConsumerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 2 s
+   startTime = time.Now()
+   partitionConsumerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 4 s
+   startTime = time.Now()
+   partitionConsumerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 4 s
+   startTime = time.Now()
+   partitionConsumerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f193ffd..2338074 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1072,6 +1072,46 @@ func TestSendTimeout(t *testing.T) {
makeHTTPCall(t, http.MethodDelete, quotaURL, "")
 }
 
+func TestProducerWithBackoffPolicy(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: serviceURL,
+   })
+   assert.NoError(t, err)
+   defer client.Close()
+
+   topicName := newTopicName()
+
+   backoff := newTestBackoffPolicy(1*time.Second, 4*time.Second)
+   _producer, err := client.CreateProducer(ProducerOptions{
+   Topic: topicName,
+   SendTimeout:   2 * time.Second,
+   BackoffPolicy: backoff,
+   })
+   assert.Nil(t, err)
+   defer _producer.Close()
+
+   partitionProducerImp := 
_producer.(*producer).producers[0].(*partitionProducer)
+   // 1 s
+   startTime := time.Now()
+   partitionProducerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 2 s
+   startTime = time.Now()
+   partitionProducerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 4 s
+   startTime = time.Now()
+   partitionProducerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+
+   // 4 s
+   startTime = time.Now()
+   partitionProducerImp.reconnectToBroker()
+   assert.True(t, backoff.IsExpectedIntervalFrom(startTime))
+}
+
 func TestSendContextExpired(t *testing.T) {
quotaURL := adminURL + 
"/admin/v2/namespaces/public/default/backlogQuota"
quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 3539037..e4679ab 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -20,6 +20,8 @@ package pulsar
 import (
"context"
"time"
+
+   "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 // ReaderMessage packages Reader and Message as a struct to use.
@@ -86,6 +88,10 @@ type ReaderOptions struc

[pulsar-client-go] branch master updated: [Issue 833] Fix the availablePermits leak that could cause consumer stuck. (#835)

2022-10-13 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new a013ff0  [Issue 833] Fix the availablePermits leak that could cause 
consumer stuck. (#835)
a013ff0 is described below

commit a013ff0b7353fab87a7eb7599377bb06b46eb7b7
Author: Jiaqi Shen <18863662...@163.com>
AuthorDate: Thu Oct 13 16:06:01 2022 +0800

[Issue 833] Fix the availablePermits leak that could cause consumer stuck. 
(#835)

* fix: fix for issue833

* fix: fix for issue833 by modify dispatcher()
---
 pulsar/consumer_partition.go | 55 +
 pulsar/consumer_test.go  | 82 
 2 files changed, 122 insertions(+), 15 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 7ddff5e..5b61e7d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -82,6 +82,15 @@ const (
noMessageEntry = -1
 )
 
+type permitsReq int32
+
+const (
+   // reset the availablePermits of pc
+   permitsReset permitsReq = iota
+   // increase the availablePermits
+   permitsInc
+)
+
 type partitionConsumerOpts struct {
topic  string
consumerName   string
@@ -128,7 +137,8 @@ type partitionConsumer struct {
messageCh chan ConsumerMessage
 
// the number of message slots available
-   availablePermits int32
+   availablePermits   int32
+   availablePermitsCh chan permitsReq
 
// the size of the queue channel for buffering messages
queueSize   int32
@@ -224,6 +234,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
dlq:  dlq,
metrics:  metrics,
schemaInfoCache:  newSchemaInfoCache(client, options.topic),
+   availablePermitsCh:   make(chan permitsReq, 10),
}
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
@@ -932,7 +943,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
 
// reset available permits
-   pc.availablePermits = 0
+   pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)
 
pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -955,19 +966,14 @@ func (pc *partitionConsumer) dispatcher() {
messages[0] = nil
messages = messages[1:]
 
-   // TODO implement a better flow controller
-   // send more permits if needed
-   pc.availablePermits++
-   flowThreshold := 
int32(math.Max(float64(pc.queueSize/2), 1))
-   if pc.availablePermits >= flowThreshold {
-   availablePermits := pc.availablePermits
-   requestedPermits := availablePermits
-   pc.availablePermits = 0
+   pc.availablePermitsCh <- permitsInc
 
-   pc.log.Debugf("requesting more permits=%d 
available=%d", requestedPermits, availablePermits)
-   if err := 
pc.internalFlow(uint32(requestedPermits)); err != nil {
-   pc.log.WithError(err).Error("unable to 
send permits")
-   }
+   case pr := <-pc.availablePermitsCh:
+   switch pr {
+   case permitsInc:
+   pc.increasePermitsAndRequestMoreIfNeed()
+   case permitsReset:
+   pc.availablePermits = 0
}
 
case clearQueueCb := <-pc.clearQueueCh:
@@ -998,7 +1004,7 @@ func (pc *partitionConsumer) dispatcher() {
messages = nil
 
// reset available permits
-   pc.availablePermits = 0
+   pc.availablePermitsCh <- permitsReset
initialPermits := uint32(pc.queueSize)
 
pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
@@ -1438,6 +1444,25 @@ func (pc *partitionConsumer) 
discardCorruptedMessage(msgID *pb.MessageIdData,
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
}
+   pc.availablePermitsCh <- permitsInc
+}
+
+func (pc *partitionConsumer) increasePermitsAndRequestMoreIfNeed() {
+   // TODO implement a better flow con

[pulsar-client-go] annotated tag v0.9.0 updated (dd63a4c -> 61f494f)

2022-10-08 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.9.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


*** WARNING: tag v0.9.0 was modified! ***

from dd63a4c  (commit)
  to 61f494f  (tag)
 tagging dd63a4cc3c151cf9fdc6070ae4922966185e7c20 (commit)
 replaces v0.9.0-candidate-1
  by xiaolongran
  on Sun Oct 9 10:29:51 2022 +0800

- Log -
Release v0.9.0
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmNCMh8PHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjm794P/0IpCFuxnlZEVVrC5IpPtzibcw7a0hwnW7t5
jWGpDP9eFLOVZNID6hUtKEfJSe20/Cqzc4h46By5zFmCXg6flrRt9WnKgTcN/3MJ
IFp2yrdSfjU17ZjFCl3wxcNP5Atlw+zwv3IEnuyGyyk88g++PoRyYwy5wysXQxjV
cNvnlWJJX+J80JEGMoCQVh3RhAb2jXikmpTsycBoPFeoS1HNUqigBnB8dNLKILaf
GJkT02GXuDCv+LHyQiSHnkDiTtf/MjWSLLbzo5xwr+MHFJrDYoVXjJKZ4zr9OdVo
hTvlr7ZvUFAg4+MLd9Ubt5RqsREqxOkOmQq4MioTotUQDjAMj4cEIY0PlD7vcJg7
Y4TrijL99VF1tEkNEKTLyJWiaKJZWLAQUuW/v73RVTytlHpWJa2t5Szq6DZf4RU1
Tu0ANMj22Q5XIXeM2iSAiFa009q1zQPxuj7C30eJufe/2t/ebijtMvtSWIKGvQJ1
mpxJnw4YCjE/1seiToaRRrzB+c/b8OYXch6xc4vIP+Ipp/uPUp2zUR5OkYe1u8Xn
ptI+G91xFsOTqgipg28ud9bqkvUkHNF0eleS8tLTgIVmae/KqjTJbQY3QlWWRGg1
FprowQYEr9+BEUDOxVOKhKtrpi6Gv8AuBrJJIDcs+B/V2GzWVA9h1lxFA55MkKj2
TUe8FYJj
=ZcW4
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r57071 - in /dev/pulsar/pulsar-client-go-0.9.0-candidate-2: ./ apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc apache-pulsar-c

2022-09-28 Thread rxl
Author: rxl
Date: Thu Sep 29 06:29:56 2022
New Revision: 57071

Log:
Staging artifacts and signature for Pulsar Client Go release 0.9.0-candidate-2

Added:
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/

dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz
   (with props)

dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc

dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512

Added: 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc
==
--- 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.asc
 Thu Sep 29 06:29:56 2022
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmM1ObIACgkQ+q+T3I6Q
+oRhypQ//aTnXDBDkTB/ucCx73YqjhbRLTyZMS8w3vdaUFtXA+keYaRGAKRYifzQa
+awIgQjKA7b9vStZiSi411sRhoxEIHUoatTBDefmPYESgpJlPOZZ4Bk8CyGStwPz0
+w50VqH3SdZyBmO/4JpJFUMzWPY9mUkGwqXRu01DqZez8+xe3P2e6Xn9V6pHYdIzm
+zWAWcrbW7WPZxrGvwy5/LKIJrY43OHPacIlZ6DFGxY7cgNJbZfqT3cPBq2FbIyGE
+lsEle4BapEDbrY6gahUzfmHVsKe9BoRmY/ycvPi6+9nTnGI6KxoR3TC9uxv//mOS
+dmQBsd+YH+o+/hLoXvu9Gdpe8xTUFRYwSIC0p9b/06rtbJ2bq6wBdsd3VtUSdEXg
+UKYVptI6uvXoiY9c8AqE/vBEJaMflfXQS4cQ5EwvCeIEmZ6D2bNwgv8zkCzuryq7
+PyThip6Warl4jqv5zb/sKqG74hRHIt+S+DdLkB6M7aYggf+YLCRr8Ry7sRoPcEgD
+pr1Nt416n+iQ2ErykzN/Z7FqjM9tABwo5pbJxRY4M+4AQ/3YQe1Zk8dSOzZWD45X
+bb5r9X464acMVDd6V6DrH9xb6QNjgSGyQQgAX5aB5CB+xUwhvC9LMbaIzzZx+D35
+O0EV8SScc7jbOHNTx/OHXZ/RYzlJGpn7QzBoR0vQx0hk0ts13aY=
+=ICJv
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512
==
--- 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.9.0-candidate-2/apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz.sha512
 Thu Sep 29 06:29:56 2022
@@ -0,0 +1 @@
+9731d6a0615288e77feb4b73fedbbdf6d275ebefeee3cee5fc4e849f38789863f0532c7e8b93eb1e601bd98f9bb21d50a714fcf87fac9987a745a052bbe23ca3
  apache-pulsar-client-go-0.9.0-candidate-2-src.tar.gz




[pulsar-client-go] annotated tag v0.9.0-candidate-2 updated (dd63a4c -> 6b9884d)

2022-09-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.9.0-candidate-2
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


*** WARNING: tag v0.9.0-candidate-2 was modified! ***

from dd63a4c  (commit)
  to 6b9884d  (tag)
 tagging dd63a4cc3c151cf9fdc6070ae4922966185e7c20 (commit)
 replaces v0.9.0-candidate-1
  by xiaolongran
  on Thu Sep 29 14:12:31 2022 +0800

- Log -
Release v0.9.0-candidate-2
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmM1N08PHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjmaigP/AyeWAYtPyXjQ/Uf2utHfs68Ahz5QzWP6V6B
vdtlKOxldF0KGR+vkSNMlb8LphazeWBAcQu4ZN2XqnwUOMfwNdNPA9nSyE+mVzSD
mUFbXhizui2DmVVgqqaTFPbqSTwWYyELBTO77yBzbHQcH0U1sJnULNJlL1dfU5Dm
DAk9XKEn8DCXcartKFTl8UiQ2FGp5MtncnnjvpinboElZSqmUnfXDYjCK79Dwmn4
XwpQAtPbm4fcubDJXTieWt1lDUG7KsvUNqEYuZI8VTX83+3N8NFLsy1im+iLL9aH
hYxOiYbJjbdTaBgC9xLnkvRnuGzE0PxNxwOeu/S2yZI44cnXOErcX6H/DcioHB58
PsGEza5luPNFHfyi+/BiHu70YgXDmBSdybmOVXfwM+aAH6VkIF7CcdmKJUYynQud
HfFU9iQhgSx0y7u5drwqTzW4/OLXNyVKGdiJVCX8z4tOQf/kHwH1Q4sH3bnjMJvB
r+ILR6W038R6rN3UCUjMqDus0UCA4Ai7XaAO6ZFObSCdZms0REcG6IAfSOQLM/S+
zKr3hWQlZTGDQtje/QCtA13jbnAPcvvp50nluehiI6Vd3gvNWThOmlYPKk6dsLSt
jPTPw2xjeLnbkwU9K+/05wBrLBUOyF7khGRcUf3RIjHKVkfvYI5bwA0fPEDkr3pr
MuehDjL3
=T9lz
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[pulsar-client-go] branch branch-0.9.0 updated (ea6eccf -> dd63a4c)

2022-09-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-0.9.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


from ea6eccf  feat: support multiple schema version for producer and 
consumer (#611)
 add 6a8e7f3  [issue #752] replace go-rate rate limiter with a buffered 
channel implementation (#799)
 add bd19458  [issue 814] consumer and producer reconnect failure metrics 
counter (#815)
 add c247619  style: fix to follow lint error (#824)
 add 934c867  [oauth2] Remove oauth2 go.mod and go.sum (#802)
 add 91b807d  chore: rename test_helper to follow test code naming 
convention (#822)
 add 3d63718  Bump github.com/stretchr/testify to update gopkg.in/yaml.v3 
(#813)
 add a09460e  [client] Add MetricsRegisterer to ClientOptions (#826)
 add da24461  Add golang 1.19 in tests matrix (#832)
 add 2d5f6fc  ci: add makefile (#800)
 add f8dc88e  Make keepalive interval configurable (#838)
 add 68e4317  [issue #807] dlq topic producer options (#809)
 add fb8e801  [log-format] remove redundant "[]" pair in the head and tail 
of log content (#831)
 add 649d992  Update proto file latest (#841)
 add b06e198  [bugfix] Fix wrong check eventime by default (#843)
 add edd5c71  NackBackoffPolicy.Next return time.Duration (#834)
 add e78dc3c  Introduce doneCh for ack error (#777)
 add 6a8847f  Parameterize the reconnection option (#853)
 add 9ecebb1  add 0.9.0 release changelog (#804)
 add dd63a4c  Embed Go SDK version to 0.9.0 (#854)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/project.yml  | 7 +-
 CHANGELOG.md   |48 +
 Dockerfile |23 +-
 docker-ci.sh => Makefile   |31 +-
 README.md  |35 +-
 VERSION| 2 +-
 go.mod |10 +-
 go.sum |46 +-
 integration-tests/{ => conf}/.htpasswd | 0
 integration-tests/{ => conf}/client.conf   | 0
 integration-tests/{ => conf}/standalone.conf   | 0
 oauth2/authorization_tokenretriever.go | 1 -
 oauth2/authorization_tokenretriever_test.go|10 +-
 oauth2/go.mod  |12 -
 oauth2/go.sum  |   117 -
 oauth2/oidc_endpoint_provider_test.go  | 4 +-
 perf/perf-producer.go  |23 +-
 pulsar/client.go   | 8 +
 pulsar/client_impl.go  |21 +-
 pulsar/consumer.go | 9 +
 pulsar/consumer_impl.go| 7 +
 pulsar/consumer_multitopic.go  | 4 +
 pulsar/consumer_partition.go   |57 +-
 pulsar/consumer_partition_test.go  |11 +-
 pulsar/consumer_regex.go   | 4 +
 pulsar/consumer_test.go|63 +-
 pulsar/dlq_router.go   |18 +-
 pulsar/{test_helper.go => helper_for_test.go}  | 0
 pulsar/impl_message.go |11 +
 pulsar/internal/backoff.go |26 +-
 pulsar/internal/backoff_test.go| 8 +-
 pulsar/internal/commands.go| 1 -
 pulsar/internal/connection.go  |18 +-
 pulsar/internal/connection_pool.go | 8 +-
 pulsar/internal/http_client.go | 2 +-
 pulsar/internal/lookup_service_test.go |75 +-
 pulsar/internal/metrics.go |   172 +-
 pulsar/internal/pulsar_proto/PulsarApi.pb.go   | 11285 +++
 pulsar/internal/pulsar_proto/PulsarApi.proto   |   156 +-
 pulsar/internal/rpc_client.go  | 2 +-
 pulsar/log/wrapper_logrus.go   |16 +-
 .../namespace_name_test.go => message_test.go} |10 +-
 pulsar/negative_acks_tracker.go| 4 +-
 pulsar/negative_acks_tracker_test.go   |14 +-
 pulsar/negative_backoff_policy.go  |19 +-
 pulsar/negative_backoff_policy_test.go | 5 +-
 pulsar/producer.go | 6 +
 pulsar/producer_partition.go   |27 +-
 pulsar/retry_router.go |16 +-
 .../pulsar-test-service-start.sh   | 0
 .../pulsar-test-service-stop.sh| 0
 run-ci.sh => scripts/run-ci.sh | 6 +-
 stable.txt  

[pulsar-client-go] branch master updated: Embed Go SDK version to 0.9.0 (#854)

2022-09-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new dd63a4c  Embed Go SDK version to 0.9.0 (#854)
dd63a4c is described below

commit dd63a4cc3c151cf9fdc6070ae4922966185e7c20
Author: xiaolong ran 
AuthorDate: Thu Sep 29 14:01:36 2022 +0800

Embed Go SDK version to 0.9.0 (#854)

Signed-off-by: xiaolongran 

Signed-off-by: xiaolongran 
---
 pulsar/internal/connection.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 8ddbc04..4196396 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -39,8 +39,8 @@ import (
 )
 
 const (
-   // TODO: Find a better way to embed the version in the library code
-   PulsarVersion   = "0.1"
+   // PulsarVersion FIXME: Before each release, please modify the current 
Version value.
+   PulsarVersion   = "0.9.0"
ClientVersionString = "Pulsar Go " + PulsarVersion
 
PulsarProtocolVersion = int32(pb.ProtocolVersion_v18)



[pulsar-client-go] branch master updated: add 0.9.0 release changelog (#804)

2022-09-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ecebb1  add 0.9.0 release changelog (#804)
9ecebb1 is described below

commit 9ecebb1cf92587d58bbd1ea526a92314e0dc3d91
Author: Rui Fu 
AuthorDate: Thu Sep 29 11:24:59 2022 +0800

add 0.9.0 release changelog (#804)
---
 CHANGELOG.md | 48 
 VERSION  |  2 +-
 stable.txt   |  2 +-
 3 files changed, 50 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2e0c97d..e68f59b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,54 @@
 
 All notable changes to this project will be documented in this file.
 
+[0.9.0] 2022-07-07
+
+## Feature
+* Add TableView support, see 
[PR-743](https://github.com/apache/pulsar-client-go/pull/743)
+* Support ack response for Go SDK, see 
[PR-776](https://github.com/apache/pulsar-client-go/pull/776)
+* Add basic authentication, see 
[PR-778](https://github.com/apache/pulsar-client-go/pull/778)
+* Support multiple schema version for producer and consumer, see 
[PR-611](https://github.com/apache/pulsar-client-go/pull/611)
+* Add schema support to Reader, see 
[PR-741](https://github.com/apache/pulsar-client-go/pull/741)
+
+## Improve
+* Add consumer seek by time on partitioned topic, see 
[PR-782](https://github.com/apache/pulsar-client-go/pull/782)
+* Fix using closed connection in consumer, see 
[PR-785](https://github.com/apache/pulsar-client-go/pull/785)
+* Add go 1.18 to the test matrix, see 
[PR-790](https://github.com/apache/pulsar-client-go/pull/790)
+* Schema creation and validation functions without panic, see 
[PR-794](https://github.com/apache/pulsar-client-go/pull/794)
+* Fix ack request not set requestId when enable AckWithResponse option, see 
[PR-780](https://github.com/apache/pulsar-client-go/pull/780)
+* Fix nil pointer dereference in TopicNameWithoutPartitionPart, see 
[PR-734](https://github.com/apache/pulsar-client-go/pull/734)
+* Add error response for Ack func, see 
[PR-775](https://github.com/apache/pulsar-client-go/pull/775)
+* Fix sequenceID is not equal to cause the connection to be closed 
incorrectly, see [PR-774](https://github.com/apache/pulsar-client-go/pull/774)
+* Add consumer state check when request commands, see 
[PR-772](https://github.com/apache/pulsar-client-go/pull/772)
+* Fix panic caused by flushing current batch with an incorrect internal 
function, see [PR-750](https://github.com/apache/pulsar-client-go/pull/750)
+* Fix deadlock in Producer Send when message fails to encode, see 
[PR-762](https://github.com/apache/pulsar-client-go/pull/762)
+* Add `-c/--max-connections` parameter to pulsar-perf-go and set it to 1 by 
default, see [PR-765](https://github.com/apache/pulsar-client-go/pull/765)
+* Fix producer unable register when cnx closed, see 
[PR-761](https://github.com/apache/pulsar-client-go/pull/761)
+* Fix annotation typo in `consumer.go`, see 
[PR-758](https://github.com/apache/pulsar-client-go/pull/758)
+* Dlq producer on topic with schema, see 
[PR-723](https://github.com/apache/pulsar-client-go/pull/723)
+* Add service not ready check, see 
[PR-757](https://github.com/apache/pulsar-client-go/pull/757)
+* Fix ack timeout cause reconnect, see 
[PR-756](https://github.com/apache/pulsar-client-go/pull/756)
+* Cleanup topics after unit tests, see 
[PR-755](https://github.com/apache/pulsar-client-go/pull/755)
+* Allow config reader subscription name, see 
[PR-754](https://github.com/apache/pulsar-client-go/pull/754)
+* Exposing broker metadata, see 
[PR-745](https://github.com/apache/pulsar-client-go/pull/745)
+* Make go version consistent, see 
[PR-751](https://github.com/apache/pulsar-client-go/pull/751)
+* Temporarily point ci to pulsar 2.8.2, see 
[PR-747](https://github.com/apache/pulsar-client-go/pull/747)
+* Upgrade klauspost/compress to v1.14.4, see 
[PR-740](https://github.com/apache/pulsar-client-go/pull/740)
+* Stop ticker when create producer failed, see 
[PR-730](https://github.com/apache/pulsar-client-go/pull/730)
+
+## New Contributors
+* @NaraLuwan made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/730
+* @shubham1172 made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/735
+* @nicoloboschi made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/738
+* @ZiyaoWei made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/741
+* @nodece made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/757
+* @lhotari made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/765
+* @samuelhewitt made their first contribution in 
https://github.com/apache/pulsar-client-go/pull/762
+* @shileiyu made their first contribution in 
https

[pulsar-client-go] branch master updated (3d63718 -> a09460e)

2022-08-22 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


from 3d63718  Bump github.com/stretchr/testify to update gopkg.in/yaml.v3 
(#813)
 add a09460e  [client] Add MetricsRegisterer to ClientOptions (#826)

No new revisions were added by this update.

Summary of changes:
 pulsar/client.go   |  5 +++
 pulsar/client_impl.go  | 11 -
 pulsar/consumer_partition_test.go  | 11 +++--
 pulsar/internal/lookup_service_test.go | 75 ++
 pulsar/internal/metrics.go | 72 
 5 files changed, 107 insertions(+), 67 deletions(-)



[pulsar-client-go] branch master updated: Fix ack request not set requestId when enable AckWithResponse option (#780)

2022-05-27 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 15136a7  Fix ack request not set requestId when enable AckWithResponse 
option (#780)
15136a7 is described below

commit 15136a706f1293615507fcbfb85163be2932bbb0
Author: liushengzhong0927 <103550934+liushengzhong0...@users.noreply.github.com>
AuthorDate: Sat May 28 01:41:00 2022 +0800

Fix ack request not set requestId when enable AckWithResponse option (#780)

Fixes #779

Motivation
Fix bug in #779

Modifications
Set requestId int the ack request command when the AckWithResponse option 
is enabled.
---
 pulsar/consumer_partition.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 06ccfd5..b7c2c8e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -534,6 +534,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
}
 
if pc.options.ackWithResponse {
+   cmdAck.RequestId = proto.Uint64(reqID)
_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), 
reqID, pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.WithError(err).Error("Ack with response error")



[pulsar-client-go] branch master updated: Support ack response for Go SDK (#776)

2022-05-23 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new c41616b  Support ack response for Go SDK (#776)
c41616b is described below

commit c41616b2f5125603fe252a90ca004bc0bd3d76d8
Author: xiaolong ran 
AuthorDate: Tue May 24 14:32:05 2022 +0800

Support ack response for Go SDK (#776)

* Support ack response for Go SDK

Signed-off-by: xiaolongran 

* add test case for this change

Signed-off-by: xiaolongran 
---
 pulsar/consumer.go|  7 +++
 pulsar/consumer_impl.go   |  1 +
 pulsar/consumer_partition.go  | 11 +++
 pulsar/consumer_test.go   | 35 +++
 pulsar/internal/connection.go | 23 +++
 5 files changed, 77 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index dfe27c5..2df1637 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -182,6 +182,13 @@ type ConsumerOptions struct {
// > Notice: the NackBackoffPolicy will not work with 
`consumer.NackID(MessageID)`
// > because we are not able to get the redeliveryCount from the 
message ID.
NackBackoffPolicy NackBackoffPolicy
+
+   // AckWithResponse is a return value added to Ack Command, and its 
purpose is to confirm whether Ack Command
+   // is executed correctly on the Broker side. When set to true, the 
error information returned by the Ack
+   // method contains the return value of the Ack Command processed by the 
Broker side; when set to false, the
+   // error information of the Ack method only contains errors that may 
occur in the Go SDK's own processing.
+   // Default: false
+   AckWithResponse bool
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e36f040..e887538 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
keySharedPolicy:
c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: 
c.options.Decryption,
+   ackWithResponse:
c.options.AckWithResponse,
}
cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 30ffcb2..06ccfd5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -104,6 +104,7 @@ type partitionConsumerOpts struct {
keySharedPolicy*KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
+   ackWithResponsebool
 }
 
 type partitionConsumer struct {
@@ -525,12 +526,22 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) 
{
EntryId:  proto.Uint64(uint64(msgID.entryID)),
}
 
+   reqID := pc.client.rpcClient.NewRequestID()
cmdAck := &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
MessageId:  messageIDs,
AckType:pb.CommandAck_Individual.Enum(),
}
 
+   if pc.options.ackWithResponse {
+   _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), 
reqID, pb.BaseCommand_ACK, cmdAck)
+   if err != nil {
+   pc.log.WithError(err).Error("Ack with response error")
+   req.err = err
+   }
+   return
+   }
+
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), 
pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index cadd8e4..0366884 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1321,6 +1321,41 @@ func TestRLQ(t *testing.T) {
assert.Nil(t, checkMsg)
 }
 
+func TestAckWithResponse(t *testing.T) {
+   now := time.Now().Unix()
+   topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now)
+   ctx := context.Background()
+
+   client, err := NewClient(ClientOptions{URL: lookupURL})
+   assert.Nil(t, err)
+   defer client.Close()
+
+   consumer, err := client.Subscribe(ConsumerOptions{
+   Topic:   topic01,
+   SubscriptionName:"my-sub",
+   Type:Shared,
+ 

[pulsar-client-go] branch master updated: fix nil pointer dereference in TopicNameWithoutPartitionPart (#734)

2022-05-23 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new a8270eb  fix nil pointer dereference in TopicNameWithoutPartitionPart 
(#734)
a8270eb is described below

commit a8270ebb7549e6206f82ffec0ea5c75d4f2f1ded
Author: Jeremy 
AuthorDate: Tue May 24 12:09:59 2022 +0800

fix nil pointer dereference in TopicNameWithoutPartitionPart (#734)

Signed-off-by: hantmac 

add error check

Signed-off-by: hantmac 
---
 pulsar/consumer_partition_test.go  |  4 +---
 pulsar/internal/metrics.go |  5 -
 pulsar/internal/topic_name.go  |  3 +++
 pulsar/internal/topic_name_test.go | 12 
 4 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index e7fcd5d..2255e52 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -21,11 +21,9 @@ import (
"sync"
"testing"
 
+   "github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
-
"github.com/stretchr/testify/assert"
-
-   "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 func TestSingleMessageIDNoAckTracker(t *testing.T) {
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index ec1a96e..1cab470 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -482,7 +482,10 @@ func NewMetricsProvider(metricsCardinality int, 
userDefinedLabels map[string]str
 
 func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
labels := make(map[string]string, 3)
-   tn, _ := ParseTopicName(t)
+   tn, err := ParseTopicName(t)
+   if err != nil {
+   return nil
+   }
topic := TopicNameWithoutPartitionPart(tn)
switch mp.metricsLevel {
case 4:
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 86a1ebe..481e41f 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -107,6 +107,9 @@ func ParseTopicName(topic string) (*TopicName, error) {
 }
 
 func TopicNameWithoutPartitionPart(tn *TopicName) string {
+   if tn == nil {
+   return ""
+   }
if tn.Partition < 0 {
return tn.Name
}
diff --git a/pulsar/internal/topic_name_test.go 
b/pulsar/internal/topic_name_test.go
index f08fcd0..ab6537b 100644
--- a/pulsar/internal/topic_name_test.go
+++ b/pulsar/internal/topic_name_test.go
@@ -104,20 +104,24 @@ func TestParseTopicNameErrors(t *testing.T) {
 
 func TestTopicNameWithoutPartitionPart(t *testing.T) {
tests := []struct {
-   tn   TopicName
+   tn   *TopicName
expected string
}{
{
-   tn:   TopicName{Name: 
"persistent://public/default/my-topic", Partition: -1},
+   tn:   &TopicName{Name: 
"persistent://public/default/my-topic", Partition: -1},
expected: "persistent://public/default/my-topic",
},
{
-   tn:   TopicName{Name: 
"persistent://public/default/my-topic-partition-0", Partition: 0},
+   tn:   &TopicName{Name: 
"persistent://public/default/my-topic-partition-0", Partition: 0},
expected: "persistent://public/default/my-topic",
},
+   {
+   tn:   nil,
+   expected: "",
+   },
}
for _, test := range tests {
-   assert.Equal(t, test.expected, 
TopicNameWithoutPartitionPart(&test.tn))
+   assert.Equal(t, test.expected, 
TopicNameWithoutPartitionPart(test.tn))
}
 }
 



[pulsar-client-go] branch master updated: Revert "Fix stuck when reconnect broker (#703)" (#767)

2022-05-22 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 67e2075  Revert "Fix stuck when reconnect broker (#703)" (#767)
67e2075 is described below

commit 67e2075c618ac967008fa713ef3e172d64e068e5
Author: Lari Hotari 
AuthorDate: Mon May 23 06:41:42 2022 +0300

Revert "Fix stuck when reconnect broker (#703)" (#767)

This reverts commit 1a8432cfd3aa231f8eb3c97171a47eab98a8f20a.
---
 pulsar/internal/connection.go  | 4 
 pulsar/internal/connection_pool.go | 2 +-
 2 files changed, 1 insertion(+), 5 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 11c9d49..da9a901 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -823,8 +823,6 @@ func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer)
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
 
-   c.changeState(connectionClosed)
-
if consumer, ok := c.consumerHandler(consumerID); ok {
consumer.ConnectionClosed()
c.DeleteConsumeHandler(consumerID)
@@ -837,8 +835,6 @@ func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d", 
closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()
 
-   c.changeState(connectionClosed)
-
producer, ok := c.deletePendingProducers(producerID)
// did we find a producer?
if ok {
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 6491abd..5ec457e 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -82,7 +82,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
// current connection is closed, we need to remove the 
connection object from the current
// connection pool and create a new connection.
if conn.closed() {
-   p.log.Infof("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
+   p.log.Debugf("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
delete(p.connections, key)
conn.Close()



[pulsar-client-go] branch master updated: Add error response for Ack func (#775)

2022-05-22 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 5108332  Add error response for Ack func (#775)
5108332 is described below

commit 5108332c9dd4cb454c26804304bffb82eeffc713
Author: xiaolong ran 
AuthorDate: Mon May 23 11:15:27 2022 +0800

Add error response for Ack func (#775)

* Add error response for Ack func

Signed-off-by: xiaolongran 

* when connection closed we need to reconnect by using new cnx

Signed-off-by: xiaolongran 

* fix comments

Signed-off-by: xiaolongran 
---
 pulsar/consumer.go   |  4 ++--
 pulsar/consumer_impl.go  | 20 ++--
 pulsar/consumer_multitopic.go| 17 +
 pulsar/consumer_partition.go | 17 +++--
 pulsar/consumer_regex.go | 17 +
 pulsar/impl_message.go   |  9 ++---
 pulsar/internal/connection_pool.go   |  4 +++-
 .../pulsartracing/consumer_interceptor_test.go   |  8 ++--
 8 files changed, 56 insertions(+), 40 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c67509b..dfe27c5 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -200,10 +200,10 @@ type Consumer interface {
Chan() <-chan ConsumerMessage
 
// Ack the consumption of a single message
-   Ack(Message)
+   Ack(Message) error
 
// AckID the consumption of a single message, identified by its 
MessageID
-   AckID(MessageID)
+   AckID(MessageID) error
 
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 2bd3ed5..e36f040 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
"context"
+   "errors"
"fmt"
"math/rand"
"strconv"
@@ -34,7 +35,7 @@ import (
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
-   AckID(id trackingMessageID)
+   AckID(id trackingMessageID) error
NackID(id trackingMessageID)
NackMsg(msg Message)
 }
@@ -438,29 +439,28 @@ func (c *consumer) Receive(ctx context.Context) (message 
Message, err error) {
}
 }
 
-// Messages
+// Chan return the message chan to users
 func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
 }
 
 // Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) {
-   c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) error {
+   return c.AckID(msg.ID())
 }
 
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
mid, ok := c.messageID(msgID)
if !ok {
-   return
+   return errors.New("failed to convert trackingMessageID")
}
 
if mid.consumer != nil {
-   mid.Ack()
-   return
+   return mid.Ack()
}
 
-   c.consumers[mid.partitionIdx].AckID(mid)
+   return c.consumers[mid.partitionIdx].AckID(mid)
 }
 
 // ReconsumeLater mark a message for redelivery after custom delay
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index c1cb3d8..1d75a24 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
"context"
+   "errors"
"fmt"
"sync"
"time"
@@ -112,30 +113,30 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) 
(message Message, err
}
 }
 
-// Messages
+// Chan return the message chan to users
 func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
 }
 
 // Ack the consumption of a single message
-func (c *multiTopicConsumer) Ack(msg Message) {
-   c.AckID(msg.ID())
+func (c *multiTopicConsumer) Ack(msg Message) error {
+   return c.AckID(msg.ID())
 }
 
-// Ack the consumption of a single message, identified by its MessageID
-func (c *multiTopicConsumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *multiTopicConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
-  

[pulsar-client-go] branch master updated: Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774)

2022-05-11 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new d9b1083  Fix sequenceID is not equal to cause the connection to be 
closed incorrectly (#774)
d9b1083 is described below

commit d9b108351798569fd1c5b1c7a27a3da96fc2decd
Author: xiaolong ran 
AuthorDate: Wed May 11 23:09:10 2022 +0800

Fix sequenceID is not equal to cause the connection to be closed 
incorrectly (#774)

Signed-off-by: xiaolongran 

### Motivation

When processing the sendReceipt command, if the sequenceID returned by the 
broker is greater than the sequenceID in the current pendingQueue, we need to 
close the current connection to fix the inconsistency between the broker and 
the client state.
When the sequenceID returned by the broker is smaller than the sequenceID 
in the current pendingQueue, we do not need to close the current connection, 
and expect to increment the value of the returned sequenceID when the broker 
retries next time.

The current code processing logic is just the opposite, resulting in the 
failure to recover after the first situation occurs, and a phenomenon similar 
to the following occurs:
---
 pulsar/producer_partition.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 87d1d69..5a2694c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -785,15 +785,15 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
}
 
if pi.sequenceID < response.GetSequenceId() {
-   // Ignoring the ack since it's referring to a message that has 
already timed out.
+   // Force connection closing so that messages can be 
re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
+   p._getConn().Close()
return
} else if pi.sequenceID > response.GetSequenceId() {
-   // Force connection closing so that messages can be 
re-transmitted in a new connection
+   // Ignoring the ack since it's referring to a message that has 
already timed out.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
-   p._getConn().Close()
return
} else {
// The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback



[pulsar-client-go] branch master updated: Add consumer state check when request commands (#772)

2022-05-06 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 1e37f2f  Add consumer state check when request commands (#772)
1e37f2f is described below

commit 1e37f2fc93f37f39f1c1cd0fa2cba79375902f7c
Author: xiaolong ran 
AuthorDate: Fri May 6 22:15:44 2022 +0800

Add consumer state check when request commands (#772)

* Add consumer state check when request commands

Signed-off-by: xiaolongran 

* fix a little

Signed-off-by: xiaolongran 
---
 pulsar/consumer_partition.go | 68 
 1 file changed, 62 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b06474d..db2994f 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+   "errors"
"fmt"
"math"
"strings"
@@ -278,6 +279,10 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*unsubscribeRequest) {
 }
 
 func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
+   if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+   pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
+   return trackingMessageID{}, errors.New("failed to redeliver 
closing or closed consumer")
+   }
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
 
@@ -292,6 +297,11 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
 }
 
 func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, 
error) {
+   if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+   pc.log.WithField("state", state).Error("Failed to 
getLastMessageID closing or closed consumer")
+   return trackingMessageID{}, errors.New("failed to 
getLastMessageID closing or closed consumer")
+   }
+
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId:  proto.Uint64(requestID),
@@ -308,6 +318,10 @@ func (pc *partitionConsumer) requestGetLastMessageID() 
(trackingMessageID, error
 }
 
 func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+   if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+   pc.log.WithField("state", state).Error("Failed to ack by 
closing or closed consumer")
+   return
+   }
if !msgID.Undefined() && msgID.ack() {
pc.metrics.AcksCounter.Inc()

pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
 / 1.0e9)
@@ -331,6 +345,10 @@ func (pc *partitionConsumer) NackMsg(msg Message) {
 }
 
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
+   if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+   pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
+   return
+   }
pc.eventsCh <- &redeliveryRequest{msgIds}
 
iMsgIds := make([]MessageID, len(msgIds))
@@ -341,6 +359,10 @@ func (pc *partitionConsumer) Redeliver(msgIds []messageID) 
{
 }
 
 func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
+   if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
+   pc.log.WithField("state", state).Error("Failed to redeliver 
closing or closed consumer")
+   return
+   }
msgIds := req.msgIds
pc.log.Debug("Request redelivery after negative ack for messages", 
msgIds)
 
@@ -352,11 +374,14 @@ func (pc *partitionConsumer) internalRedeliver(req 
*redeliveryRequest) {
}
}
 
-   pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
+   err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, 
&pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
MessageIds: msgIDDataList,
})
+   if err != nil {
+   pc.log.Error("Connection was closed when request redeliver cmd")
+   }
 }
 
 func (pc *partitionConsumer) getConsumerState() consumerState {
@@ -381,6 +406,10 @@ func (pc *partitionConsumer) Close() {
 }
 
 func (pc *partitionConsumer)

[pulsar-client-go] branch master updated: Calling internalFlushCurrentBatch will be forwaded to internalFlushCurrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder)

2022-05-06 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new ee379ac  Calling internalFlushCurrentBatch will be forwaded to 
internalFlushCurrentBatches if the associated batch builder contains multi 
batches(KeyBasedBatchBuilder). (#750)
ee379ac is described below

commit ee379ac240b2560ea43b81b5495a382c75cb2a9d
Author: Karma 
AuthorDate: Fri May 6 21:37:07 2022 +0800

Calling internalFlushCurrentBatch will be forwaded to 
internalFlushCurrentBatches if the associated batch builder contains multi 
batches(KeyBasedBatchBuilder). (#750)

Doing transparent forwarding could centralized the branching logic at only 
one place.

Co-authored-by: Karma Shi 
---
 pulsar/producer_partition.go | 33 +
 1 file changed, 13 insertions(+), 20 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 43ae68f..87d1d69 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -386,11 +386,7 @@ func (p *partitionProducer) runEventsLoop() {
return
}
case <-p.batchFlushTicker.C:
-   if p.batchBuilder.IsMultiBatches() {
-   p.internalFlushCurrentBatches()
-   } else {
-   p.internalFlushCurrentBatch()
-   }
+   p.internalFlushCurrentBatch()
}
}
 }
@@ -485,11 +481,8 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
-   if p.batchBuilder.IsMultiBatches() {
-   p.internalFlushCurrentBatches()
-   } else {
-   p.internalFlushCurrentBatch()
-   }
+
+   p.internalFlushCurrentBatch()
 
// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
payload, request,
@@ -504,11 +497,9 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
}
 
if !sendAsBatch || request.flushImmediately {
-   if p.batchBuilder.IsMultiBatches() {
-   p.internalFlushCurrentBatches()
-   } else {
-   p.internalFlushCurrentBatch()
-   }
+
+   p.internalFlushCurrentBatch()
+
}
 }
 
@@ -522,6 +513,11 @@ type pendingItem struct {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
+   if p.batchBuilder.IsMultiBatches() {
+   p.internalFlushCurrentBatches()
+   return
+   }
+
batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData == nil {
return
@@ -683,11 +679,8 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
 }
 
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
-   if p.batchBuilder.IsMultiBatches() {
-   p.internalFlushCurrentBatches()
-   } else {
-   p.internalFlushCurrentBatch()
-   }
+
+   p.internalFlushCurrentBatch()
 
pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {



[pulsar-client-go] branch master updated: Fix producer unable register when cnx closed (#761)

2022-04-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new e3f625a  Fix producer unable register when cnx closed (#761)
e3f625a is described below

commit e3f625ae8da938f5d147b3a2cadced69c07b
Author: xiaolong ran 
AuthorDate: Wed Apr 20 21:33:07 2022 +0800

Fix producer unable register when cnx closed (#761)

* Fix producer unable register when cnx closed

Signed-off-by: xiaolongran 

* fix code style

Signed-off-by: xiaolongran 
---
 pulsar/internal/connection.go | 10 ++
 pulsar/producer_partition.go  |  7 +--
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 6055252..11c9d49 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -53,7 +53,8 @@ type TLSOptions struct {
 }
 
 var (
-   errConnectionClosed = errors.New("connection closed")
+   errConnectionClosed   = errors.New("connection closed")
+   errUnableRegisterListener = errors.New("unable register listener when 
con closed")
 )
 
 // ConnectionListener is a user of a connection (eg. a producer or
@@ -72,7 +73,7 @@ type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback 
func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
-   RegisterListener(id uint64, listener ConnectionListener)
+   RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
@@ -847,17 +848,18 @@ func (c *connection) handleCloseProducer(closeProducer 
*pb.CommandCloseProducer)
}
 }
 
-func (c *connection) RegisterListener(id uint64, listener ConnectionListener) {
+func (c *connection) RegisterListener(id uint64, listener ConnectionListener) 
error {
// do not add if connection is closed
if c.closed() {
c.log.Warnf("Connection closed unable register listener 
id=%+v", id)
-   return
+   return errUnableRegisterListener
}
 
c.listenersLock.Lock()
defer c.listenersLock.Unlock()
 
c.listeners[id] = listener
+   return nil
 }
 
 func (c *connection) UnregisterListener(id uint64) {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d37d60e..bc775e9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -262,11 +262,14 @@ func (p *partitionProducer) grabCnx() error {
p.sequenceIDGenerator = &nextSequenceID
}
p._setConn(res.Cnx)
-   p._getConn().RegisterListener(p.producerID, p)
+   err = p._getConn().RegisterListener(p.producerID, p)
+   if err != nil {
+   return err
+   }
p.log.WithFields(log.Fields{
"cnx":   res.Cnx.ID(),
"epoch": atomic.LoadUint64(&p.epoch),
-   }).Debug("Connected producer")
+   }).Info("Connected producer")
 
pendingItems := p.pendingQueue.ReadableSlice()
viewSize := len(pendingItems)



[pulsar] branch master updated: Fix logger lever from warn to debug when uancked threshold is limited (#14950)

2022-04-14 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f9cfc3e7144 Fix logger lever from warn to debug when uancked threshold 
is limited (#14950)
f9cfc3e7144 is described below

commit f9cfc3e7144c75cf53672f6a66f38cac82a5104a
Author: xiaolong ran 
AuthorDate: Thu Apr 14 21:49:15 2022 +0800

Fix logger lever from warn to debug when uancked threshold is limited 
(#14950)

* Add rate logger for repo

Signed-off-by: xiaolongran 

* fix log info

Signed-off-by: xiaolongran 

* fix comments

Signed-off-by: xiaolongran 
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java   | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0210ec699f8..fbfb09b24a5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -260,8 +260,10 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 topic.getBrokerService().executor().execute(() -> 
readMoreEntries());
 }
 } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == 
TRUE) {
-log.warn("[{}] Dispatcher read is blocked due to unackMessages 
{} reached to max {}", name,
-totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
+if (log.isDebugEnabled()) {
+log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
+totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
+}
 } else if (!havePendingRead) {
 if (log.isDebugEnabled()) {
 log.debug("[{}] Schedule read of {} messages for {} 
consumers", name, messagesToRead,



[pulsar-client-go] branch master updated (0b0b312 -> 5ced071)

2022-04-12 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


from 0b0b312  Dlq producer on topic with schema (#723)
 add 5ced071  fix annotation typo (#758)

No new revisions were added by this update.

Summary of changes:
 pulsar/consumer.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[pulsar-client-go] branch master updated: Dlq producer on topic with schema (#723)

2022-04-12 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 0b0b312  Dlq producer on topic with schema (#723)
0b0b312 is described below

commit 0b0b312ceea6f77efb9058bfdac8156b959a9189
Author: Garule Prabhudas 
AuthorDate: Wed Apr 13 09:06:30 2022 +0530

Dlq producer on topic with schema (#723)

* add schema from consumer while creating dlq producer

* write test case for dlq topic with schema

* since payload and schema are mutually exclusive
 - use payload from schema only if payload is nil

* edit test

Co-authored-by: PGarule 
---
 pulsar/dlq_router.go |  6 +++---
 pulsar/producer_partition.go | 12 +++-
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index c858946..b28600f 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -89,8 +89,7 @@ func (r *dlqRouter) run() {
select {
case cm := <-r.messageCh:
r.log.WithField("msgID", cm.ID()).Debug("Got message 
for DLQ")
-   producer := r.getProducer()
-
+   producer := 
r.getProducer(cm.Consumer.(*consumer).options.Schema)
msg := cm.Message.(*message)
msgID := msg.ID()
producer.SendAsync(context.Background(), 
&ProducerMessage{
@@ -127,7 +126,7 @@ func (r *dlqRouter) close() {
}
 }
 
-func (r *dlqRouter) getProducer() Producer {
+func (r *dlqRouter) getProducer(schema Schema) Producer {
if r.producer != nil {
// Producer was already initialized
return r.producer
@@ -140,6 +139,7 @@ func (r *dlqRouter) getProducer() Producer {
Topic:   r.policy.DeadLetterTopic,
CompressionType: LZ4,
BatchingMaxPublishDelay: 100 * time.Millisecond,
+   Schema:  schema,
})
 
if err != nil {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 525b89c..d37d60e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -405,18 +405,20 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
msg := request.msg
 
+   // read payload from message
payload := msg.Payload
-   var schemaPayload []byte
+
var err error
-   if p.options.Schema != nil {
+
+   // payload and schema are mutually exclusive
+   // try to get payload from schema value only if payload is not set
+   if payload == nil && p.options.Schema != nil {
+   var schemaPayload []byte
schemaPayload, err = p.options.Schema.Encode(msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message 
failed %s", msg.Value)
return
}
-   }
-
-   if payload == nil {
payload = schemaPayload
}
 



[pulsar-client-go] branch master updated: Fix ack timeout cause reconnect (#756)

2022-04-05 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 85d7661  Fix ack timeout cause reconnect (#756)
85d7661 is described below

commit 85d76615dea35e24e4699d6dc54be216aeb89499
Author: xiaolong ran 
AuthorDate: Wed Apr 6 11:36:02 2022 +0800

Fix ack timeout cause reconnect (#756)

* Fix ack timeout cause reconnect

Signed-off-by: xiaolongran 

* fix some logic

Signed-off-by: xiaolongran 
---
 pulsar/producer_partition.go | 16 
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d031e9a..525b89c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -780,22 +780,22 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
 
if !ok {
// if we receive a receipt although the pending queue is empty, 
the state of the broker and the producer differs.
-   // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
-   // the state discrepancy.
-   p.log.Warnf("Received ack for %v although the pending queue is 
empty, closing connection", response.GetMessageId())
-   p._getConn().Close()
+   p.log.Warnf("Got ack %v for timed out msg", 
response.GetMessageId())
return
}
 
if pi.sequenceID < response.GetSequenceId() {
-   // if we receive a receipt that is not the one expected, the 
state of the broker and the producer differs.
-   // At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
-   // the state discrepancy.
+   // Ignoring the ack since it's referring to a message that has 
already timed out.
+   p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
+   response.GetSequenceId(), pi.sequenceID)
+   return
+   } else if pi.sequenceID > response.GetSequenceId() {
+   // Force connection closing so that messages can be 
re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: 
%v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
-   } else if pi.sequenceID == response.GetSequenceId() {
+   } else {
// The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback
p.pendingQueue.Poll()
 



[pulsar-client-go] branch master updated: allow config reader subscription name (#754)

2022-03-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 925da1a  allow config reader subscription name (#754)
925da1a is described below

commit 925da1a039a9551ab410727d7766c7e0a13ad69d
Author: ZhangJian He 
AuthorDate: Tue Mar 29 14:22:48 2022 +0800

allow config reader subscription name (#754)

### Motivation
allow config reader's subscription name, follow java's feature 
https://github.com/apache/pulsar/pull/8801

### Modifications
add param `SubscriptionName` in `ReaderOptions`

### Verifying this change
add the test for setting the subscritpion name
---
 pulsar/reader.go  |  4 
 pulsar/reader_impl.go |  9 ++---
 pulsar/reader_test.go | 22 ++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index f1cb575..3539037 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -68,6 +68,10 @@ type ReaderOptions struct {
// SubscriptionRolePrefix sets the subscription role prefix. The 
default prefix is "reader".
SubscriptionRolePrefix string
 
+   // SubscriptionName sets the subscription name.
+   // If subscriptionRolePrefix is set at the same time, this 
configuration will prevail
+   SubscriptionName string
+
// ReadCompacted, if enabled, the reader will read messages from the 
compacted topic rather than reading the
// full message backlog of the topic. This means that, if the topic has 
been compacted, the reader will only
// see the latest value for each key in the topic, up until the point 
in the topic message backlog that has
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 596884a..dd552b6 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -66,11 +66,14 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
}
}
 
-   subscriptionName := options.SubscriptionRolePrefix
+   subscriptionName := options.SubscriptionName
if subscriptionName == "" {
-   subscriptionName = "reader"
+   subscriptionName = options.SubscriptionRolePrefix
+   if subscriptionName == "" {
+   subscriptionName = "reader"
+   }
+   subscriptionName += "-" + generateRandomName()
}
-   subscriptionName += "-" + generateRandomName()
 
receiverQueueSize := options.ReceiverQueueSize
if receiverQueueSize <= 0 {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index aa12078..3c85871 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -24,6 +24,7 @@ import (
"time"
 
"github.com/apache/pulsar-client-go/pulsar/crypto"
+   "github.com/google/uuid"
"github.com/stretchr/testify/assert"
 )
 
@@ -48,6 +49,27 @@ func TestReaderConfigErrors(t *testing.T) {
assert.NotNil(t, err)
 }
 
+func TestReaderConfigSubscribeName(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: lookupURL,
+   })
+   if err != nil {
+   t.Fatal(err)
+   }
+   defer client.Close()
+
+   consumer, err := client.CreateReader(ReaderOptions{
+   StartMessageID:   EarliestMessageID(),
+   Topic:uuid.New().String(),
+   SubscriptionName: uuid.New().String(),
+   })
+   if err != nil {
+   t.Fatal(err)
+   }
+   defer consumer.Close()
+   assert.NotNil(t, consumer)
+}
+
 func TestReader(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,


[pulsar-client-go] branch master updated (5c04811 -> 95fdb3f)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from 5c04811  Temporarily point ci to pulsar 2.8.2 (#747)
 add 95fdb3f  [build] make go version consistent (#751)

No new revisions were added by this update.

Summary of changes:
 Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[pulsar-client-go] branch master updated (95fdb3f -> b58f115)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from 95fdb3f  [build] make go version consistent (#751)
 add b58f115  [PIP 90] go client retrieve broker metadata (#745)

No new revisions were added by this update.

Summary of changes:
 pulsar/consumer_partition.go   | 24 --
 pulsar/impl_message.go | 10 +
 pulsar/internal/buffer.go  |  6 ++
 pulsar/internal/commands.go| 20 --
 pulsar/internal/commands_test.go   | 18 
 pulsar/internal/connection.go  |  5 +++--
 .../pulsartracing/message_carrier_util_test.go |  8 
 pulsar/message.go  |  8 
 pulsar/negative_acks_tracker_test.go   | 16 +++
 9 files changed, 109 insertions(+), 6 deletions(-)


[pulsar-client-go] branch master updated (b58f115 -> 4f50a67)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from b58f115  [PIP 90] go client retrieve broker metadata (#745)
 add 4f50a67  Add schema support to Reader (#741)

No new revisions were added by this update.

Summary of changes:
 pulsar/reader.go  |  3 +++
 pulsar/reader_impl.go |  1 +
 pulsar/reader_test.go | 43 +++
 3 files changed, 47 insertions(+)


[pulsar-client-go] branch master updated: Add schema support to Reader (#741)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 4f50a67  Add schema support to Reader (#741)
4f50a67 is described below

commit 4f50a678d9030828933e03c1a79ae310c38893ad
Author: Ziyao Wei 
AuthorDate: Tue Mar 22 02:26:37 2022 -0400

Add schema support to Reader (#741)

Add schema support to Reader
---
 pulsar/reader.go  |  3 +++
 pulsar/reader_impl.go |  1 +
 pulsar/reader_test.go | 43 +++
 3 files changed, 47 insertions(+)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index c45b8ff..f1cb575 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,6 +79,9 @@ type ReaderOptions struct {
 
// Decryption represents the encryption related fields required by the 
reader to decrypt a message.
Decryption *MessageDecryptionInfo
+
+   // Schema represents the schema implementation.
+   Schema Schema
 }
 
 // Reader can be used to scan through all the messages currently available in 
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0fed80c..596884a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
nackRedeliveryDelay:defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
decryption: options.Decryption,
+   schema: options.Schema,
}
 
reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index bdafea0..aa12078 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -710,3 +710,46 @@ func TestProducerReaderRSAEncryption(t *testing.T) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
 }
+
+func TestReaderWithSchema(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: lookupURL,
+   })
+
+   assert.Nil(t, err)
+   defer client.Close()
+
+   topic := newTopicName()
+   schema := NewStringSchema(nil)
+
+   // create producer
+   producer, err := client.CreateProducer(ProducerOptions{
+   Topic:  topic,
+   Schema: schema,
+   })
+   assert.Nil(t, err)
+   defer producer.Close()
+
+   value := "hello pulsar"
+   _, err = producer.Send(context.Background(), &ProducerMessage{
+   Value: value,
+   })
+   assert.Nil(t, err)
+
+   // create reader
+   reader, err := client.CreateReader(ReaderOptions{
+   Topic:  topic,
+   StartMessageID: EarliestMessageID(),
+   Schema: schema,
+   })
+   assert.Nil(t, err)
+   defer reader.Close()
+
+   msg, err := reader.Next(context.Background())
+   assert.NoError(t, err)
+
+   var res *string
+   err = msg.GetSchemaValue(&res)
+   assert.Nil(t, err)
+   assert.Equal(t, *res, value)
+}


[pulsar-client-go] branch master updated: [PIP 90] go client retrieve broker metadata (#745)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new b58f115  [PIP 90] go client retrieve broker metadata (#745)
b58f115 is described below

commit b58f1157e4aadfc662c3d5fc19f2de55cc9ff411
Author: ZhangJian He 
AuthorDate: Tue Mar 22 12:15:14 2022 +0800

[PIP 90] go client retrieve broker metadata (#745)

[PIP 90] go client retrieve broker metadata
---
 pulsar/consumer_partition.go   | 24 --
 pulsar/impl_message.go | 10 +
 pulsar/internal/buffer.go  |  6 ++
 pulsar/internal/commands.go| 20 --
 pulsar/internal/commands_test.go   | 18 
 pulsar/internal/connection.go  |  5 +++--
 .../pulsartracing/message_carrier_util_test.go |  8 
 pulsar/message.go  |  8 
 pulsar/negative_acks_tracker_test.go   | 16 +++
 9 files changed, 109 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 04a39c5..b06474d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -498,12 +498,17 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
pbMsgID := response.GetMessageId()
 
reader := internal.NewMessageReader(headersAndPayload)
+   brokerMetadata, err := reader.ReadBrokerMetadata()
+   if err != nil {
+   // todo optimize use more appropriate error codes
+   pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_BatchDeSerializeError)
+   return err
+   }
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_ChecksumMismatch)
return err
}
-
decryptedPayload, err := 
pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
// error decrypting the payload
if err != nil {
@@ -597,7 +602,18 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
pc.AckID(msgID)
continue
}
-
+   var messageIndex *uint64
+   var brokerPublishTime *time.Time
+   if brokerMetadata != nil {
+   if brokerMetadata.Index != nil {
+   aux := brokerMetadata.GetIndex() - 
uint64(numMsgs) + uint64(i) + 1
+   messageIndex = &aux
+   }
+   if brokerMetadata.BrokerTimestamp != nil {
+   aux := 
timeFromUnixTimestampMillis(*brokerMetadata.BrokerTimestamp)
+   brokerPublishTime = &aux
+   }
+   }
// set the consumer so we know how to ack the message id
msgID.consumer = pc
var msg *message
@@ -616,6 +632,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
replicatedFrom:  
msgMeta.GetReplicatedFrom(),
redeliveryCount: 
response.GetRedeliveryCount(),
orderingKey: string(smm.OrderingKey),
+   index:   messageIndex,
+   brokerPublishTime:   brokerPublishTime,
}
} else {
msg = &message{
@@ -631,6 +649,8 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom:  
msgMeta.GetReplicatedFrom(),
redeliveryCount: 
response.GetRedeliveryCount(),
+   index:   messageIndex,
+   brokerPublishTime:   brokerPublishTime,
}
}
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index a9809aa..3216676 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -241,6 +241,8 @@ type message struct {
redeliveryCount uint32
schema  Schema
encryptionContext   *EncryptionContext
+   index   *uint64
+   brokerPublishTime   *time.Time
 }
 
 func (msg *message) Topic() string {
@@ -299,6 +301,14 @@ func (msg *message) GetEncryptionContext() 
*EncryptionContext {
return msg.encryptionContext
 }
 
+func (msg *message) Index() *uint64 {
+   retu

[pulsar-client-go] branch master updated: [build] make go version consistent (#751)

2022-03-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 95fdb3f  [build] make go version consistent (#751)
95fdb3f is described below

commit 95fdb3fc9ed1c4881ee874ca47e945ca15475af3
Author: ZhangJian He 
AuthorDate: Tue Mar 22 12:12:13 2022 +0800

[build] make go version consistent (#751)

make go version consistent
---
 Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/Dockerfile b/Dockerfile
index a21a219..e12cc80 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ARG GO_VERSION=golang:1.13
+ARG GO_VERSION=golang:1.15
 FROM apachepulsar/pulsar:2.8.2 as pulsar
 FROM $GO_VERSION as go
 


[pulsar] branch master updated (c4e4ddd -> 601fbdd)

2022-03-17 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from c4e4ddd  [Transaction] Fix transaction buffer recover 
BrokerMetadataException close topic (#14709)
 add 601fbdd  Process maxRedeliverCount is 0 of DeadLeddterPolicy (#14706)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java |  2 ++
 .../org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java | 10 ++
 .../apache/pulsar/websocket/AbstractWebSocketHandlerTest.java  |  3 ++-
 3 files changed, 14 insertions(+), 1 deletion(-)


[pulsar-client-go] branch master updated: Upgrade beefsack/go-rate (#735)

2022-02-22 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 128d8ea  Upgrade beefsack/go-rate (#735)
128d8ea is described below

commit 128d8ea3e24035a49fc81a516b01bc57b654a64a
Author: Shubham Sharma 
AuthorDate: Tue Feb 22 14:58:10 2022 +0530

Upgrade beefsack/go-rate (#735)

Fixes #725

### Motivation

This PR will enable pulsar-client-go to be a compliant library. One of its 
dependencies was not compliant with OSS licensing requirements earlier, and 
this PR upgrades to the latest version of it, which has a MIT license.

### Modifications

Ran the following commands
```bash
go get github.com/beefsack/go-rate@latest
go mod tidy
```

### Verifying this change

- [x] Make sure that the change passes the CI checks.
---
 go.mod | 2 +-
 go.sum | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/go.mod b/go.mod
index c788991..e57ac12 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@ require (
github.com/AthenZ/athenz v1.10.39
github.com/DataDog/zstd v1.5.0
github.com/apache/pulsar-client-go/oauth2 
v0.0.0-20220120090717-25e59572242e
-   github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
+   github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.3.2
diff --git a/go.sum b/go.sum
index 0e0fe1c..28bc3be 100644
--- a/go.sum
+++ b/go.sum
@@ -60,8 +60,8 @@ github.com/armon/circbuf 
v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
 github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod 
h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
 github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod 
h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
 github.com/aws/aws-sdk-go v1.32.6/go.mod 
h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
-github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 
h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
-github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod 
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
+github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 
h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=
+github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod 
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=


[pulsar-client-go] annotated tag v0.8.0 updated (5daa17b -> 3df4ede)

2022-02-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.8.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


*** WARNING: tag v0.8.0 was modified! ***

from 5daa17b  (commit)
  to 3df4ede  (tag)
 tagging 5daa17b02bffe7d0844c76de2306de93fe9e3acf (commit)
 replaces v0.7.0
  by xiaolongran
  on Mon Feb 21 10:42:02 2022 +0800

- Log -
Release v0.8.0
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmIS+/oPHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjmGD4P/2ZT37Rfxzg3XJ0wzw0VwTm/2Utf1tChRkt5
JD6CvHIt22fQgTx08Bh2q+kiEGAdrAkYe20XzSB9dVvu5jv10YnUnletqkBk9QYK
Szx24cLvQjmXHJ8rUAYFyGgEeh1k3/nh1k9mrGqxAvsnegs06xtGBylHsnaVdZM7
gyBEChqyxfPdA5iZgvTsjss3dPx/puJfErpq1NQ2ng4AHImhzq9zfELUuGNtp3ce
oj0HegIlajTgLp/pSMuCCCiYy7Lx63cN3zS2Y1lkQHgrjrgbzB0MjCPMHdU2JWWo
4/SVI0QHFZKOgvyZuYtSgE5zImazBdZrktunasc+w9v3u8hsXLgdT/6WaR3u8Ton
vPwwcVpHaVwnRvdL1SGOpiQyUWTNPAnrP1XNF6tXJRaK3Ne9ZN2W5zT9We+dVSGx
m/Fnpa1pU6iRgnbBDL/2RpuzMPpxJw9ZTYmCaAmJDhvHjCJmumovhe77qznbFDYG
0oA7opjRFp1BOm2DQUo4GQbczE7r7fJK1moCIbAsDg8K1k7JsO0nJCT0Cc3EM3B+
SXos8XmX9qnGKEJpvCUcb5foI2Du1bUSwaNhlmctbDNoGrmrn4eZ8m2aW3fwLGQt
QWv6Z0duT2Pkug8T2w0OfdkqC7JD/+Jt2+XkUt+htDOgr5F3agbhVBa96wfCuidP
E6mVz98Z
=y/wi
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


[pulsar-client-go] branch master updated: Update version file to 0.8.0 (#728)

2022-02-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 1df5596  Update version file to 0.8.0 (#728)
1df5596 is described below

commit 1df5596aa7a351c6f43edb53f54b2a14b83b0073
Author: xiaolong ran 
AuthorDate: Mon Feb 21 10:40:24 2022 +0800

Update version file to 0.8.0 (#728)

Signed-off-by: xiaolongran 

## Motivation
Update version file to 0.8.0
---
 VERSION| 2 +-
 stable.txt | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/VERSION b/VERSION
index 3129f0c..c12365b 100644
--- a/VERSION
+++ b/VERSION
@@ -1,3 +1,3 @@
 // This version number refers to the currently released version number
 // Please fix the version when release.
-v0.7.0
+v0.8.0
diff --git a/stable.txt b/stable.txt
index eb53fa6..aff2506 100644
--- a/stable.txt
+++ b/stable.txt
@@ -1,3 +1,3 @@
 // This version number refers to the current stable version, generally is 
`VERSION - 1`.
 // Please fix the version when release.
-v0.7.0
+v0.8.0


[pulsar-client-go] branch master updated: hotfix: there was a ticker leak when producer creation failed (#730)

2022-02-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 9615575  hotfix: there was a ticker leak when producer creation failed 
(#730)
9615575 is described below

commit 961557532149e1d44f4c27066606d8323f172dbe
Author: L 
AuthorDate: Mon Feb 21 10:37:50 2022 +0800

hotfix: there was a ticker leak when producer creation failed (#730)

Co-authored-by: breatche 

Master Issue: [#729 ]

### Motivation


I received an alarm indicating that the server load was too high, used the 
pprof tool to locate the problem and determined that the cause was a ticker 
leak while a large number of producer creating failed.
Here are some screenshots to show it:
- pprof heap, there are a number of SDK-started ticker, accounted for 90%

![image](https://user-images.githubusercontent.com/15198796/154658829-e7bfc501-31cf-4f5e-9468-7a690d6b9a23.png)
- pprof cpu, cpu consumption is also here

![image](https://user-images.githubusercontent.com/15198796/154659229-3f2506e1-59fd-4ec0-b1f9-230fa4a48d96.png)



### Modifications

stop ticker when create producer failed
---
 pulsar/producer_partition.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 0e2e4c4..d031e9a 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -145,6 +145,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
}
err := p.grabCnx()
if err != nil {
+   p.batchFlushTicker.Stop()
logger.WithError(err).Error("Failed to create producer at 
newPartitionProducer")
return nil, err
}


[pulsar-client-go] branch master updated: add 0.8.0 changelog for repo (#727)

2022-02-15 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 965045a  add 0.8.0 changelog for repo (#727)
965045a is described below

commit 965045aa0d2c077826f64d0b88a5d5e55d9d50c4
Author: xiaolong ran 
AuthorDate: Wed Feb 16 11:39:10 2022 +0800

add 0.8.0 changelog for repo (#727)

Signed-off-by: xiaolongran 

Add 0.8.0 changelog for repo
---
 CHANGELOG.md | 47 +++
 1 file changed, 47 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2e43ddc..97f6541 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,53 @@
 
 All notable changes to this project will be documented in this file.
 
+[0.8.0] 2022-02-16
+
+## What's Changed
+* Update release docs with missing information by @cckellogg in 
https://github.com/apache/pulsar-client-go/pull/656
+* Update change log for 0.7.0 release by @cckellogg in 
https://github.com/apache/pulsar-client-go/pull/655
+* Update version to 0.7.0 by @cckellogg in 
https://github.com/apache/pulsar-client-go/pull/654
+* fix issue 650,different handle sequence value by @baomingyu in 
https://github.com/apache/pulsar-client-go/pull/651
+* Support nack backoff policy for SDK by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/660
+* Remove unused dependency in `oauth2` module by @reugn in 
https://github.com/apache/pulsar-client-go/pull/661
+* [Issue 662] Fix race in connection.go waitUntilReady() by @bschofield in 
https://github.com/apache/pulsar-client-go/pull/663
+* Update dependencies by @reugn in 
https://github.com/apache/pulsar-client-go/pull/665
+* [Issue 652] Quick fixes to the documentation for the main building blocks of 
the library by @reugn in https://github.com/apache/pulsar-client-go/pull/667
+* Add subscription properties for ConsumerOptions by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/671
+* Add new bug-resistant build constraints by @reugn in 
https://github.com/apache/pulsar-client-go/pull/670
+* Handle the parameters parsing error in NewProvider by @reugn in 
https://github.com/apache/pulsar-client-go/pull/673
+* Update email template of release docs by @izumo27 in 
https://github.com/apache/pulsar-client-go/pull/674
+* Add properties filed for batch container by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/683
+* [Issue 513] Correct apparent logic error in batchContainer's hasSpace() 
method by @bschofield in https://github.com/apache/pulsar-client-go/pull/678
+* Upgrade DataDog/zstd to v1.5.0 by @dferstay in 
https://github.com/apache/pulsar-client-go/pull/690
+* fix:add order key to message by @leizhiyuan in 
https://github.com/apache/pulsar-client-go/pull/688
+* Set default go version to 1.13 in CI related files by @reugn in 
https://github.com/apache/pulsar-client-go/pull/692
+* [Partition Consumer] Provide lock-free access to compression providers by 
@dferstay in https://github.com/apache/pulsar-client-go/pull/689
+* Use a separate gorutine to handle the logic of reconnect by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/691
+* [DefaultRouter] add a parallel bench test by @dferstay in 
https://github.com/apache/pulsar-client-go/pull/693
+* Revert "Use a separate gorutine to handle the logic of reconnect" by 
@wolfstudy in https://github.com/apache/pulsar-client-go/pull/700
+* Fix data race while accessing connection in partitionProducer by @wolfstudy 
in https://github.com/apache/pulsar-client-go/pull/701
+* Fix stuck when reconnect broker by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/703
+* Fix slice bounds out of range for readSingleMessage by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/709
+* Encryption failure test case fix by @GPrabhudas in 
https://github.com/apache/pulsar-client-go/pull/708
+* [DefaultRouter] fix unnecessary system clock reads due to races accessing 
router state by @dferstay in https://github.com/apache/pulsar-client-go/pull/694
+* Fix negative WaitGroup counter issue by @wolfstudy in 
https://github.com/apache/pulsar-client-go/pull/712
+* [issue 675] oauth2 use golang-jwt address CVE-2020-26160 by @zzzming in 
https://github.com/apache/pulsar-client-go/pull/713
+* readme: add note about how to build and test by @pgier in 
https://github.com/apache/pulsar-client-go/pull/714
+* Bump oauth2 package version to the latest in master by @iorvd in 
https://github.com/apache/pulsar-client-go/pull/715
+* Fix closed connection leak by @billowqiu in 
https://github.com/apache/pulsar-client-go/pull/716
+* [Bugfix] producer runEventsLoop for reconnect early exit by @billowqiu in 
https://github.com/apache/pulsar-client-go/pull/721
+* [issue 679][oauth2] Fix macos compiler warnings by @pgier in 
https://github.com/apache/pulsar-client-go/pull/71

svn commit: r52577 - in /dev/pulsar/pulsar-client-go-0.8.0-candidate-1: ./ apache-pulsar-client-go-0.8.0-src.tar.gz apache-pulsar-client-go-0.8.0-src.tar.gz.asc apache-pulsar-client-go-0.8.0-src.tar.g

2022-02-15 Thread rxl
Author: rxl
Date: Wed Feb 16 03:23:50 2022
New Revision: 52577

Log:
Staging artifacts and signature for Pulsar Client Go release 0.8.0-candidate-1

Added:
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/

dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz
   (with props)

dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc

dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512

Added: 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc
==
--- 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.asc
 Wed Feb 16 03:23:50 2022
@@ -0,0 +1,17 @@
+-BEGIN PGP SIGNATURE-
+
+iQJEBAABCAAuFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmIMZswQHHlvbmdAYXBh
+Y2hlLm9yZwAKCRD6r5PcjpChGCr1EACfxHU+Lis8n/SJUSfuwLpHZWGzytN0xllV
+zJOlYi0zaUV34BcKk7w3+pi6QTKUR6VFippDxKDkwZgrN1nhC5/MaCIEe3C/HviA
+KL3TBJdFIElJ1tI3n6TfmWmRRlXhmMJiTkjnVuLXRA00d3hf/wGL2lpq/cY+CWma
+rWJKoEg0MLc+svxWVzNQRZF0BL54JlSei6D/9qmDLp6skF5UyO4m+moUV9S5kj07
+/jZbgELQWSF9utwo2vzdHUd16+K99kSggmyNaZuoIROn+BAY/S300BaPR0X7hVQl
+lp65EhnSf0MTdovGjADTyn+EYlw6kyIlIIc4ICNg/7uhu6Q3pGIxD83YQY1fy9p7
+/rsyFwFkrnKNbqSIGQrGkNFcNbszRfgEVOiP/Mp8Pp1p0RWL404BD/EEOtZVQcFb
+Y6sBSqY6gUnzH7VTPePtbBVp0RRUABnQ1JbcDZUOwLW5mU2qUUA287f1Ow1B90hF
+/ktBs4TPHmKIuJ6wdZyb10FEbvbHAEZoHGYVrMibRkBbR6eSD9NGQQy4gnWy/i8g
+JoV4379VIyncuJAQfdv4O5d/sC9jksupn431s+HxMk0dsq8XyGgf19ACSxX3Zt8u
+VqL8F2rCEfk9IeV4GP3M9T3TdNK/RApdAHxif1bs1oy6pNetnFzQRG73RbYYtaza
+znydc5uvGg==
+=N0KU
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512
==
--- 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.8.0-candidate-1/apache-pulsar-client-go-0.8.0-src.tar.gz.sha512
 Wed Feb 16 03:23:50 2022
@@ -0,0 +1 @@
+0015cc735af13139a564f0fd57c58af753579953e3c12f08cb62d8e9801cdc2964c60392ed9a98dd54845be7f7b1a353bd49cecb03360b4cb691904f8447769c
  apache-pulsar-client-go-0.8.0-src.tar.gz




[pulsar-client-go] annotated tag v0.8.0-candidate-1 updated (5daa17b -> 58d5097)

2022-02-15 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.8.0-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


*** WARNING: tag v0.8.0-candidate-1 was modified! ***

from 5daa17b  (commit)
  to 58d5097  (tag)
 tagging 5daa17b02bffe7d0844c76de2306de93fe9e3acf (commit)
 replaces v0.7.0
  by xiaolongran
  on Wed Feb 16 10:37:55 2022 +0800

- Log -
Release v0.2.0
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmIMY4UPHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjmJHoQAL3XJWlSSkRS8VOatPeRVt4XboPGE3khFXcx
K5nywWB5XSIuiQcunO+QVl71g7EYu+f6PCUJWi8kQpl809/C+nazHjbYm04okAKd
5+mtznRjAGfIa/s4xfNnPso01AacBb6K5Q9U3pRGJbSp8QQLL/yTcYHL7OGgH80d
n1uz8IQxEBHicPt9xnKMj6BhU3md4kfqvisQgqXODiSaOKpSoSE0hStHWYKK3aVh
hIDCmaclOj161VF3i7+wSlKD0OknPvU5KwvUIM9IvSyfxq9Ftvywh3bylU3ZGC3S
vupONpSB0XPVEgV4Wirn297FFVSUV1MNKJXW9yRiRqnqL2DjcFlaCEeO8aYxtwgP
ySOTLBAS5Laec+vQISfvsASjMHhMwZaVtEtjUIrMh+XOWWiIY4ujlN33AM4Ni0Ir
1UPcvya+smmH42Q4TB9Qe0ldy6Eh6jxdQMsY2bha8SMYPtCUIc9LTIx5H3JG9mhr
ouuhbQ5cU1+h9dLKGGpgkOOWoFk50JZgTgdktlV0mYPLbHIqZGLVkhwtxlqxSBJL
uWMHw4gM3ACR93f8DgfatHLZa3P7HSGMQLdyaj1SK4wA46vTt6HFBqHhJlS+JFQe
Z18VQV8uxdYn7Lwnzc9IuijMtxp+v+l5sjxzoKdTXiQuUlYjpahAGiNgMaqYuKDy
Tl2PQh5b
=fd4J
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


[pulsar-client-go] branch branch-0.8.0 created (now 5daa17b)

2022-02-15 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-0.8.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


  at 5daa17b  Markdown error fix (#722)

No new revisions were added by this update.


[pulsar-client-go] branch master updated: [optimize] Stop batchFlushTicker when Disable batching (#720)

2022-02-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 8f8287f  [optimize] Stop batchFlushTicker when Disable batching (#720)
8f8287f is described below

commit 8f8287f30fc0cc90e9dea0003776491fef22eb33
Author: ZhangJian He 
AuthorDate: Thu Feb 10 11:43:18 2022 +0800

[optimize] Stop batchFlushTicker when Disable batching (#720)

### Motivation

Disable batchFlushTicker when Disable batching, reduce cpu cost

### Modifications

Stop the batchFlushTicker when Disable batching
---
 pulsar/producer_partition.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 913c33c..0e2e4c4 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -126,6 +126,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
metrics:  metrics,
epoch:0,
}
+   if p.options.DisableBatching {
+   p.batchFlushTicker.Stop()
+   }
p.setProducerState(producerInit)
 
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {


[pulsar-client-go] branch master updated: [Bugfix] producer runEventsLoop for reconnect early exit (#721)

2022-02-08 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 16e8b81  [Bugfix] producer runEventsLoop for reconnect early exit 
(#721)
16e8b81 is described below

commit 16e8b8114615146d645f2947afeb5f3cedbc85a8
Author: billowqiu 
AuthorDate: Wed Feb 9 15:04:26 2022 +0800

[Bugfix] producer runEventsLoop for reconnect early exit (#721)

* Fix closed connection leak

* Fix closed connection leak

* bugfix: runEventsLoop for reconnect early exit

* [optimize] add log when reconnect

* [Bugfix]fix panic

* [Bugfix]remove log conn ID

* [optimize]Distinguish failed create producer log
---
 pulsar/consumer_partition.go   |  3 +++
 pulsar/internal/connection_pool.go |  6 +++---
 pulsar/producer_partition.go   | 21 +
 3 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 9bd4a94..04a39c5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -894,6 +894,7 @@ func (pc *partitionConsumer) runEventsLoop() {
for {
select {
case <-pc.closeCh:
+   pc.log.Info("close consumer, exit reconnect")
return
case <-pc.connectClosedCh:
pc.log.Debug("runEventsLoop will reconnect")
@@ -992,6 +993,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
for maxRetry != 0 {
if pc.getConsumerState() != consumerReady {
// Consumer is already closing
+   pc.log.Info("consumer state not ready, exit reconnect")
return
}
 
@@ -1005,6 +1007,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
pc.log.Info("Reconnected consumer to broker")
return
}
+   pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
errMsg := err.Error()
if strings.Contains(errMsg, errTopicNotFount) {
// when topic is deleted, we should give up 
reconnection.
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 4787ba1..db67c25 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -75,15 +75,15 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
p.Lock()
conn, ok := p.connections[key]
if ok {
-   p.log.Infof("Found connection in pool key=%s logical_addr=%+v 
physical_addr=%+v",
+   p.log.Debugf("Found connection in pool key=%s logical_addr=%+v 
physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
 
// remove stale/failed connection
if conn.closed() {
-   delete(p.connections, key)
-   conn.Close()
p.log.Infof("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
+   delete(p.connections, key)
+   conn.Close()
conn = nil // set to nil so we create a new one
}
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3f1e54b..913c33c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -142,7 +142,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
}
err := p.grabCnx()
if err != nil {
-   logger.WithError(err).Error("Failed to create producer")
+   logger.WithError(err).Error("Failed to create producer at 
newPartitionProducer")
return nil, err
}
 
@@ -209,7 +209,7 @@ func (p *partitionProducer) grabCnx() error {
}
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
-   p.log.WithError(err).Error("Failed to create producer")
+   p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
return err
}
 
@@ -324,6 +324,7 @@ func (p *partitionProducer) reconnectToBroker() {
for maxRetry != 0 {
if p.getProducerState() != producerReady {
// Producer is already closing
+   p.log.I

[pulsar-client-go] branch master updated: Fix closed connection leak (#716)

2022-01-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new edf81af  Fix closed connection leak (#716)
edf81af is described below

commit edf81af225d1573848584a7e48f090a558a9c620
Author: billowqiu 
AuthorDate: Sat Jan 22 11:15:48 2022 +0800

Fix closed connection leak (#716)

Fix closed connection leak
---
 pulsar/internal/connection_pool.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index cadeed1..4787ba1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -81,6 +81,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
// remove stale/failed connection
if conn.closed() {
delete(p.connections, key)
+   conn.Close()
p.log.Infof("Removed connection from pool key=%s 
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
conn = nil // set to nil so we create a new one


[pulsar-client-go] branch master updated: Bump oauth2 package version to the latest in master (#715)

2022-01-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new ddacb92  Bump oauth2 package version to the latest in master (#715)
ddacb92 is described below

commit ddacb9201e621c58488951772a74cecf78f11f54
Author: Oleksandr Prokopovych <93676586+io...@users.noreply.github.com>
AuthorDate: Fri Jan 21 05:27:44 2022 +0200

Bump oauth2 package version to the latest in master (#715)

* bump oauth2 package to the latest in master

* bump oauth2 package to the latest in master
---
 go.mod | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/go.mod b/go.mod
index eacc490..c788991 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@ go 1.15
 require (
github.com/AthenZ/athenz v1.10.39
github.com/DataDog/zstd v1.5.0
-   github.com/apache/pulsar-client-go/oauth2 
v0.0.0-20201120111947-b8bd55bc02bd
+   github.com/apache/pulsar-client-go/oauth2 
v0.0.0-20220120090717-25e59572242e
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/davecgh/go-spew v1.1.1


[pulsar-client-go] branch master updated: readme: add note about how to build and test (#714)

2022-01-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new f5676b9  readme: add note about how to build and test (#714)
f5676b9 is described below

commit f5676b940f6029cf0a34a612b1f843bf5bdac076
Author: Paul Gier 
AuthorDate: Thu Jan 20 03:07:49 2022 -0600

readme: add note about how to build and test (#714)

readme: add note about how to build and test
---
 README.md | 10 ++
 1 file changed, 10 insertions(+)

diff --git a/README.md b/README.md
index 89c17f2..5ce74d1 100644
--- a/README.md
+++ b/README.md
@@ -134,6 +134,16 @@ for reader.HasNext() {
 }
 ```
 
+## Build and Test
+
+Build the sources:
+
+go build ./pulsar
+
+Run the unit tests:
+
+./docker-ci.sh
+
 ## Contributing
 
 Contributions are welcomed and greatly appreciated. See 
[CONTRIBUTING.md](CONTRIBUTING.md) for details on submitting patches and the 
contribution workflow.


[pulsar-client-go] branch master updated: [issue 675] oauth2 use golang-jwt address CVE-2020-26160 (#713)

2022-01-20 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 25e5957  [issue 675] oauth2 use golang-jwt address CVE-2020-26160 
(#713)
25e5957 is described below

commit 25e59572242edd2c4aac2a836773b6f124efb7fa
Author: ming 
AuthorDate: Thu Jan 20 04:07:17 2022 -0500

[issue 675] oauth2 use golang-jwt address CVE-2020-26160 (#713)

* oauth2 use golang-jwt address CVE-2020-26160

* set go 1.15 minimum version as required by golang-jwt
---
 .github/workflows/go.yml  | 2 +-
 .github/workflows/project.yml | 2 +-
 README.md | 2 +-
 docker-ci.sh  | 2 +-
 go.mod| 2 +-
 go.sum| 4 ++--
 oauth2/auth.go| 2 +-
 oauth2/go.mod | 4 ++--
 oauth2/go.sum | 4 ++--
 9 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index bc64a50..a3c4c60 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -6,7 +6,7 @@ jobs:
 runs-on: ubuntu-latest
 strategy:
   matrix:
-go-version: [1.13, 1.14, 1.15, 1.16, 1.17]
+go-version: [1.15, 1.16, 1.17]
 steps:
   - name: clean docker cache
 run: |
diff --git a/.github/workflows/project.yml b/.github/workflows/project.yml
index 15ecb98..810979a 100644
--- a/.github/workflows/project.yml
+++ b/.github/workflows/project.yml
@@ -6,7 +6,7 @@ jobs:
 runs-on: ubuntu-latest
 strategy:
   matrix:
-go-version: [1.13, 1.14, 1.15, 1.16, 1.17]
+go-version: [1.15, 1.16, 1.17]
 steps:
   - name: Set up Go
 uses: actions/setup-go@v1
diff --git a/README.md b/README.md
index 73bd982..89c17f2 100644
--- a/README.md
+++ b/README.md
@@ -36,7 +36,7 @@ CGo based library.
 
 ## Requirements
 
-- Go 1.13+
+- Go 1.15+
 
 ## Status
 
diff --git a/docker-ci.sh b/docker-ci.sh
index 200cc50..37f97e7 100755
--- a/docker-ci.sh
+++ b/docker-ci.sh
@@ -25,7 +25,7 @@ cd ${SRC_DIR}
 
 IMAGE_NAME=pulsar-client-go-test:latest
 
-GO_VERSION=${1:-1.13}
+GO_VERSION=${1:-1.16}
 docker rmi --force ${IMAGE_NAME} || true
 docker rmi --force apachepulsar/pulsar:latest || true
 docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="golang:${GO_VERSION}" .
diff --git a/go.mod b/go.mod
index 29407fa..eacc490 100644
--- a/go.mod
+++ b/go.mod
@@ -1,6 +1,6 @@
 module github.com/apache/pulsar-client-go
 
-go 1.13
+go 1.15
 
 require (
github.com/AthenZ/athenz v1.10.39
diff --git a/go.sum b/go.sum
index ee4a2b3..b2fdce1 100644
--- a/go.sum
+++ b/go.sum
@@ -98,8 +98,6 @@ github.com/envoyproxy/go-control-plane 
v0.9.9-0.20201210154907-fd9021fe5dad/go.m
 github.com/envoyproxy/go-control-plane 
v0.9.9-0.20210217033140-668b12f5399d/go.mod 
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/fatih/color v1.7.0/go.mod 
h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
-github.com/form3tech-oss/jwt-go v3.2.3+incompatible 
h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
-github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod 
h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
 github.com/fsnotify/fsnotify v1.4.9/go.mod 
h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -122,6 +120,8 @@ github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
 github.com/gogo/protobuf v1.3.2/go.mod 
h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod 
h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
+github.com/golang-jwt/jwt v3.2.2+incompatible 
h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
+github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod 
h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod 
h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
 github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod 
h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
diff --git a/oauth2/auth.go b/oauth2/auth.go
index 0a3c73a..d44bd35 100644
--- a/oauth2/auth.go
+++ b/oauth2/auth.go
@@ -22,7 +22,7 @@ import (
"time"
 
"github.com/apache/pulsar-client-go/oauth2/clock"
-   "github.com/form3tech-oss/jwt-go"
+   "github.com/golang-jwt/jwt"
"golang.org/x/

[pulsar-client-go] branch master updated: Fix negative WaitGroup counter issue (#712)

2022-01-19 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 90305e8  Fix negative WaitGroup counter issue (#712)
90305e8 is described below

commit 90305e85a1631cca42cfa840675f52db9c773162
Author: xiaolong ran 
AuthorDate: Thu Jan 20 12:39:45 2022 +0800

Fix negative WaitGroup counter issue (#712)

* Fix negative WaitGroup counter issue

Signed-off-by: xiaolongran 

* use cas

Signed-off-by: xiaolongran 

* Make sure the callback logic is atomic

Signed-off-by: xiaolongran 
---
 pulsar/consumer_partition.go |  6 ++---
 pulsar/producer_partition.go | 61 +++-
 2 files changed, 35 insertions(+), 32 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 7679a8c..9bd4a94 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -33,7 +33,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
 
-   "go.uber.org/atomic"
+   uAtomic "go.uber.org/atomic"
 )
 
 var (
@@ -110,10 +110,10 @@ type partitionConsumer struct {
 
// this is needed for sending ConsumerMessage on the messageCh
parentConsumer Consumer
-   state  atomic.Int32
+   state  uAtomic.Int32
options*partitionConsumerOpts
 
-   conn atomic.Value
+   conn uAtomic.Value
 
topicstring
name string
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e318b81..3f1e54b 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -33,7 +33,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
 
-   ua "go.uber.org/atomic"
+   uAtomic "go.uber.org/atomic"
 )
 
 type producerState int32
@@ -60,12 +60,12 @@ var (
 var errTopicNotFount = "TopicNotFound"
 
 type partitionProducer struct {
-   state  ua.Int32
+   state  uAtomic.Int32
client *client
topic  string
loglog.Logger
 
-   conn atomic.Value
+   conn uAtomic.Value
 
options  *ProducerOptions
producerName string
@@ -675,7 +675,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) 
{
 
pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
-   fr.waitGroup.Done()
+   close(fr.doneCh)
return
}
 
@@ -688,7 +688,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) 
{
// The last item in the queue has been completed while we were
// looking at it. It's safe at this point to assume that every
// message enqueued before Flush() was called are now persisted
-   fr.waitGroup.Done()
+   close(fr.doneCh)
return
}
 
@@ -696,7 +696,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) 
{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) 
{
fr.err = e
-   fr.waitGroup.Done()
+   close(fr.doneCh)
},
}
 
@@ -704,19 +704,23 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
(MessageID, error) {
-   wg := sync.WaitGroup{}
-   wg.Add(1)
-
var err error
var msgID MessageID
 
+   // use atomic bool to avoid race
+   isDone := uAtomic.NewBool(false)
+   doneCh := make(chan struct{})
+
p.internalSendAsync(ctx, msg, func(ID MessageID, message 
*ProducerMessage, e error) {
-   err = e
-   msgID = ID
-   wg.Done()
+   if isDone.CAS(false, true) {
+   err = e
+   msgID = ID
+   close(doneCh)
+   }
}, true)
 
-   wg.Wait()
+   // wait for send request to finish
+   <-doneCh
return msgID, err
 }
 
@@ -828,7 +832,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {
-   defer req.waitGroup.Done()
+   defer close(req.doneCh)
if !p.casProducerState(producerReady, producerClosing) {
return
}
@@ -863,14 +867,15 @@ func (p *partitionProducer) LastSequenceID() int64 {
 }
 
 func (p *partitionProducer) Flush

[pulsar-client-go] branch master updated: [DefaultRouter] fix unnecessary system clock reads due to races accessing router state (#694)

2022-01-17 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new a119bab  [DefaultRouter] fix unnecessary system clock reads due to 
races accessing router state (#694)
a119bab is described below

commit a119bab0f8598601c0eb7f0fcd97da7ab06700c7
Author: dferstay 
AuthorDate: Mon Jan 17 00:05:25 2022 -0800

[DefaultRouter] fix unnecessary system clock reads due to races accessing 
router state (#694)

Previously, we used atomic operations to read and update parts of the
default router state.  Unfortunately, the reads and updates could race
under concurrent calls leading to unnecessary clock reads and an
associated slowdown in performance.

Now, we use atomic addition to increment the message count and batch size.
This removes the race condition by ensuring that each go-routine will have
a unique messageCount, and hence only one will perform the clock read.

Furthermore, we use atomic compare-and-swap to ensure that partitions are
not skipped if multiple go-routines attempt to increment the partition
cursor.

Signed-off-by: Daniel Ferstay 

Co-authored-by: Daniel Ferstay 
---
 pulsar/default_router.go  | 50 +--
 pulsar/default_router_test.go | 13 +++
 2 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index b5e24a6..6945ff1 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -18,7 +18,6 @@
 package pulsar
 
 import (
-   "math"
"math/rand"
"sync/atomic"
"time"
@@ -27,7 +26,7 @@ import (
 type defaultRouter struct {
currentPartitionCursor uint32
 
-   lastChangeTimestamp int64
+   lastBatchTimestamp  int64
msgCounter  uint32
cumulativeBatchSize uint32
 }
@@ -45,7 +44,7 @@ func NewDefaultRouter(
disableBatching bool) func(*ProducerMessage, uint32) int {
state := &defaultRouter{
currentPartitionCursor: rand.Uint32(),
-   lastChangeTimestamp:math.MinInt64,
+   lastBatchTimestamp: time.Now().UnixNano(),
}
 
readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
@@ -75,37 +74,38 @@ func NewDefaultRouter(
// If there's no key, we do round-robin across partition, 
sticking with a given
// partition for a certain amount of messages or volume 
buffered or the max delay to batch is reached so that
// we ensure having a decent amount of batching of the messages.
-   // Note that it is possible that we skip more than one 
partition if multiple goroutines increment
-   // currentPartitionCursor at the same time. If that happens it 
shouldn't be a problem because we only want to
-   // spread the data on different partitions but not necessarily 
in a specific sequence.
var now int64
size := uint32(len(message.Payload))
-   previousMessageCount := atomic.LoadUint32(&state.msgCounter)
-   previousBatchingMaxSize := 
atomic.LoadUint32(&state.cumulativeBatchSize)
-   previousLastChange := 
atomic.LoadInt64(&state.lastChangeTimestamp)
+   partitionCursor := 
atomic.LoadUint32(&state.currentPartitionCursor)
+   messageCount := atomic.AddUint32(&state.msgCounter, 1)
+   batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
 
-   messageCountReached := previousMessageCount >= 
uint32(maxBatchingMessages-1)
-   sizeReached := (size >= 
uint32(maxBatchingSize)-previousBatchingMaxSize)
+   // Note: use greater-than for the threshold check so that we 
don't route this message to a new partition
+   //   before a batch is complete.
+   messageCountReached := messageCount > 
uint32(maxBatchingMessages)
+   sizeReached := batchSize > uint32(maxBatchingSize)
durationReached := false
-   if readClockAfterNumMessages == 0 || 
previousMessageCount%readClockAfterNumMessages == 0 {
+   if readClockAfterNumMessages == 0 || 
messageCount%readClockAfterNumMessages == 0 {
now = time.Now().UnixNano()
-   durationReached = now-previousLastChange >= 
maxBatchingDelay.Nanoseconds()
+   lastBatchTime := 
atomic.LoadInt64(&state.lastBatchTimestamp)
+   durationReached = now-lastBatchTime > 
maxBatchingDelay.Nanoseconds()
}
if messageCountReached || s

[pulsar-client-go] branch master updated: Encryption failure test case fix (#708)

2022-01-16 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 468bfd6  Encryption failure test case fix (#708)
468bfd6 is described below

commit 468bfd6bdcd45857e64fd499b98c7f30ec6f60f7
Author: Garule Prabhudas 
AuthorDate: Mon Jan 17 12:02:51 2022 +0530

Encryption failure test case fix (#708)

* test case to detect race condition in creation of producer/consumer

* fix for race condition in consumer/producer creation

* refactor

* restore test case

Co-authored-by: PGarule 
---
 pulsar/consumer_impl.go  | 30 +++
 pulsar/consumer_partition.go |  7 -
 pulsar/consumer_test.go  | 70 +++-
 pulsar/producer_impl.go  | 21 +
 pulsar/producer_partition.go | 21 -
 pulsar/reader_impl.go| 12 
 6 files changed, 132 insertions(+), 29 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 00e0b76..2bd3ed5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
 
+   "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -156,6 +157,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
return nil, err
}
topic = tns[0].Name
+   err = addMessageCryptoIfMissing(client, &options, topic)
+   if err != nil {
+   return nil, err
+   }
return newInternalConsumer(client, options, topic, messageCh, 
dlq, rlq, false)
}
 
@@ -168,6 +173,11 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
}
options.Topics = distinct(options.Topics)
 
+   err = addMessageCryptoIfMissing(client, &options, 
options.Topics)
+   if err != nil {
+   return nil, err
+   }
+
return newMultiTopicConsumer(client, options, options.Topics, 
messageCh, dlq, rlq)
}
 
@@ -181,6 +191,12 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
if err != nil {
return nil, err
}
+
+   err = addMessageCryptoIfMissing(client, &options, tn.Name)
+   if err != nil {
+   return nil, err
+   }
+
return newRegexConsumer(client, options, tn, pattern, 
messageCh, dlq, rlq)
}
 
@@ -654,3 +670,17 @@ func (c *consumer) messageID(msgID MessageID) 
(trackingMessageID, bool) {
 
return mid, true
 }
+
+func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, 
topics interface{}) error {
+   // decryption is enabled, use default messagecrypto if not provided
+   if options.Decryption != nil && options.Decryption.MessageCrypto == nil 
{
+   messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
+   false,
+   client.log.SubLogger(log.Fields{"topic": topics}))
+   if err != nil {
+   return err
+   }
+   options.Decryption.MessageCrypto = messageCrypto
+   }
+   return nil
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1d95c42..7679a8c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -186,13 +186,6 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
if pc.options.decryption == nil {
decryptor = cryptointernal.NewNoopDecryptor() // default to 
noopDecryptor
} else {
-   if options.decryption.MessageCrypto == nil {
-   messageCrypto, err := 
crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
-   if err != nil {
-   return nil, err
-   }
-   options.decryption.MessageCrypto = messageCrypto
-   }
decryptor = cryptointernal.NewConsumerDecryptor(
options.decryption.KeyReader,
options.decryption.MessageCrypto,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 55823e4..cadd8e4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -332,14 +332,77 @@ func TestPartiti

[pulsar-client-go] branch master updated (ff7a962 -> 2bcf7c7)

2022-01-11 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from ff7a962  Revert "Use a separate gorutine to handle the logic of 
reconnect" (#700)
 add 2bcf7c7  Fix data race while accessing connection in partitionProducer 
(#701)

No new revisions were added by this update.

Summary of changes:
 pulsar/producer_partition.go | 46 +---
 1 file changed, 31 insertions(+), 15 deletions(-)


[pulsar-client-go] branch master updated: Revert "Use a separate gorutine to handle the logic of reconnect" (#700)

2022-01-06 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new ff7a962  Revert "Use a separate gorutine to handle the logic of 
reconnect" (#700)
ff7a962 is described below

commit ff7a962be6b41da2c318e6fe70da00545bc0bdd4
Author: xiaolong ran 
AuthorDate: Thu Jan 6 18:43:05 2022 +0800

Revert "Use a separate gorutine to handle the logic of reconnect" (#700)

* Revert "Use a separate gorutine to handle the logic of reconnect (#691)"

This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37.

* add closeCh for go rutine leak

Signed-off-by: xiaolongran 
---
 pulsar/producer_partition.go | 20 
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b517309..3d62758 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -77,8 +77,9 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker
 
// Channel where app is posting messages to be published
-   connectClosedCh chan connectionClosed
eventsChan  chan interface{}
+   closeCh chan struct{}
+   connectClosedCh chan connectionClosed
 
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
@@ -115,8 +116,9 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
log:  logger,
options:  options,
producerID:   client.rpcClient.NewProducerID(),
+   eventsChan:   make(chan interface{}, maxPendingMessages),
connectClosedCh:  make(chan connectionClosed, 10),
-   eventsChan:   make(chan interface{}, maxPendingMessages+20),
+   closeCh:  make(chan struct{}),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -369,13 +371,13 @@ func (p *partitionProducer) reconnectToBroker() {
 }
 
 func (p *partitionProducer) runEventsLoop() {
-
go func() {
-   for {
-   for range p.connectClosedCh {
-   p.log.Info("runEventsLoop will reconnect in 
producer")
-   p.reconnectToBroker()
-   }
+   select {
+   case <-p.closeCh:
+   return
+   case <-p.connectClosedCh:
+   p.log.Info("runEventsLoop will reconnect in producer")
+   p.reconnectToBroker()
}
}()
 
@@ -872,6 +874,8 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
p.setProducerState(producerClosed)
p.cnx.UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
+
+   close(p.closeCh)
 }
 
 func (p *partitionProducer) LastSequenceID() int64 {


[pulsar-client-go] branch revert-691-xiaolong/split-eventsCh updated (accf726 -> 536d882)

2022-01-05 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch revert-691-xiaolong/split-eventsCh
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from accf726  Revert "Use a separate gorutine to handle the logic of 
reconnect (#691)"
 add 536d882  add closeCh for go rutine leak

No new revisions were added by this update.

Summary of changes:
 pulsar/producer_partition.go | 16 ++--
 1 file changed, 14 insertions(+), 2 deletions(-)


[pulsar-client-go] branch revert-691-xiaolong/split-eventsCh created (now accf726)

2022-01-05 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch revert-691-xiaolong/split-eventsCh
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


  at accf726  Revert "Use a separate gorutine to handle the logic of 
reconnect (#691)"

This branch includes the following new commits:

 new accf726  Revert "Use a separate gorutine to handle the logic of 
reconnect (#691)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[pulsar-client-go] 01/01: Revert "Use a separate gorutine to handle the logic of reconnect (#691)"

2022-01-05 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch revert-691-xiaolong/split-eventsCh
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit accf726bdf3972ac3b8b64c7fe1c0480959cc2de
Author: xiaolong ran 
AuthorDate: Thu Jan 6 10:37:43 2022 +0800

Revert "Use a separate gorutine to handle the logic of reconnect (#691)"

This reverts commit 39e13aced4cc35a63f5d6164e63c7f1638388c37.
---
 pulsar/producer_partition.go | 16 
 1 file changed, 4 insertions(+), 12 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b517309..d67c0c0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -77,8 +77,8 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker
 
// Channel where app is posting messages to be published
-   connectClosedCh chan connectionClosed
eventsChan  chan interface{}
+   connectClosedCh chan connectionClosed
 
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
@@ -115,8 +115,8 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
log:  logger,
options:  options,
producerID:   client.rpcClient.NewProducerID(),
+   eventsChan:   make(chan interface{}, maxPendingMessages),
connectClosedCh:  make(chan connectionClosed, 10),
-   eventsChan:   make(chan interface{}, maxPendingMessages+20),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: 
internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -369,16 +369,6 @@ func (p *partitionProducer) reconnectToBroker() {
 }
 
 func (p *partitionProducer) runEventsLoop() {
-
-   go func() {
-   for {
-   for range p.connectClosedCh {
-   p.log.Info("runEventsLoop will reconnect in 
producer")
-   p.reconnectToBroker()
-   }
-   }
-   }()
-
for {
select {
case i := <-p.eventsChan:
@@ -391,6 +381,8 @@ func (p *partitionProducer) runEventsLoop() {
p.internalClose(v)
return
}
+   case <-p.connectClosedCh:
+   p.reconnectToBroker()
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()


[pulsar-client-go] branch master updated (d5d4903 -> 39e13ac)

2021-12-27 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from d5d4903  Provide lock-free access to partition consumer compression 
providers (#689)
 add 39e13ac  Use a separate gorutine to handle the logic of reconnect 
(#691)

No new revisions were added by this update.

Summary of changes:
 pulsar/producer_partition.go | 16 
 1 file changed, 12 insertions(+), 4 deletions(-)


[pulsar-client-go] branch master updated: Provide lock-free access to partition consumer compression providers (#689)

2021-12-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new d5d4903  Provide lock-free access to partition consumer compression 
providers (#689)
d5d4903 is described below

commit d5d49031c7da2aa7fa0b9338e63addc6a762281b
Author: dferstay 
AuthorDate: Sat Dec 25 19:11:32 2021 -0800

Provide lock-free access to partition consumer compression providers (#689)

The compression providers map in the partition consumer is a textbook case
for using go's lock-free sync.Map: the set of map entries is stable and
access is read-only.

On machines with 4 cores or greater, read contention on the sync.RWMutex
outweighs the cost of using a sync.Map.

Below is an old article on the subject, but it still holds true today:
https://medium.com/@deckarep/the-new-kid-in-town-gos-sync-map-de24a6bf7c2c

Signed-off-by: Daniel Ferstay 

Co-authored-by: Daniel Ferstay 
---
 pulsar/consumer_partition.go  | 42 ---
 pulsar/consumer_partition_test.go |  9 -
 2 files changed, 30 insertions(+), 21 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index ecde0bf..d438b87 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -144,8 +144,7 @@ type partitionConsumer struct {
 
log log.Logger
 
-   providersMutex   sync.RWMutex
-   compressionProviders map[pb.CompressionType]compression.Provider
+   compressionProviders sync.Map 
//map[pb.CompressionType]compression.Provider
metrics  *internal.LeveledMetrics
decryptorcryptointernal.Decryptor
 }
@@ -171,7 +170,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
closeCh:  make(chan struct{}),
clearQueueCh: make(chan func(id trackingMessageID)),
clearMessageQueuesCh: make(chan chan struct{}),
-   compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
+   compressionProviders: sync.Map{},
dlq:  dlq,
metrics:  metrics,
}
@@ -967,11 +966,15 @@ func (pc *partitionConsumer) internalClose(req 
*closeRequest) {
pc.log.Info("Closed consumer")
}
 
-   pc.providersMutex.Lock()
-   for _, provider := range pc.compressionProviders {
-   provider.Close()
-   }
-   pc.providersMutex.Unlock()
+   pc.compressionProviders.Range(func(_, v interface{}) bool {
+   if provider, ok := v.(compression.Provider); ok {
+   provider.Close()
+   } else {
+   err := fmt.Errorf("unexpected compression provider 
type: %T", v)
+   pc.log.WithError(err).Warn("Failed to close compression 
provider")
+   }
+   return true
+   })
 
pc.setConsumerState(consumerClosed)
pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1192,19 +1195,26 @@ func getPreviousMessage(mid trackingMessageID) 
trackingMessageID {
 }
 
 func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload 
internal.Buffer) (internal.Buffer, error) {
-   pc.providersMutex.RLock()
-   provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
-   pc.providersMutex.RUnlock()
+   providerEntry, ok := 
pc.compressionProviders.Load(msgMeta.GetCompression())
if !ok {
-   var err error
-   if provider, err = 
pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
+   newProvider, err := 
pc.initializeCompressionProvider(msgMeta.GetCompression())
+   if err != nil {
pc.log.WithError(err).Error("Failed to decompress 
message.")
return nil, err
}
 
-   pc.providersMutex.Lock()
-   pc.compressionProviders[msgMeta.GetCompression()] = provider
-   pc.providersMutex.Unlock()
+   var loaded bool
+   providerEntry, loaded = 
pc.compressionProviders.LoadOrStore(msgMeta.GetCompression(), newProvider)
+   if loaded {
+   // another thread already loaded this provider, so 
close the one we just initialized
+   newProvider.Close()
+   }
+   }
+   provider, ok := providerEntry.(compression.Provider)
+   if !ok {
+   err := fmt.Errorf("unexpected compression provider type: %T", 
providerEntry)
+   pc.log.WithError(err).Error("Failed to decompress message.")
+   

[pulsar-client-go] branch master updated: set default go version to 1.13 in CI related files (#692)

2021-12-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 310d480  set default go version to 1.13 in CI related files (#692)
310d480 is described below

commit 310d480485cd3e945ad991aade915f3d6a353115
Author: Eugene R 
AuthorDate: Sun Dec 26 05:10:36 2021 +0200

set default go version to 1.13 in CI related files (#692)

Set the default go version to 1.13 in the CI related files as per the 
minimum version requirements.
---
 Dockerfile   | 2 +-
 docker-ci.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 6a2fb29..401a246 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ARG GO_VERSION=golang:1.12
+ARG GO_VERSION=golang:1.13
 FROM apachepulsar/pulsar:latest as pulsar
 FROM $GO_VERSION as go
 
diff --git a/docker-ci.sh b/docker-ci.sh
index 74a57c1..200cc50 100755
--- a/docker-ci.sh
+++ b/docker-ci.sh
@@ -25,7 +25,7 @@ cd ${SRC_DIR}
 
 IMAGE_NAME=pulsar-client-go-test:latest
 
-GO_VERSION=${1:-1.12}
+GO_VERSION=${1:-1.13}
 docker rmi --force ${IMAGE_NAME} || true
 docker rmi --force apachepulsar/pulsar:latest || true
 docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="golang:${GO_VERSION}" .


[pulsar-client-go] branch master updated: fix:add order key to message (#688)

2021-12-24 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new e03600f  fix:add order key to message (#688)
e03600f is described below

commit e03600f28701c248d4841aa02715e4be1064a76c
Author: Lei Zhiyuan 
AuthorDate: Fri Dec 24 17:34:57 2021 +0800

fix:add order key to message (#688)

Co-authored-by: zhiyuanlei 

fix:add order key to message
---
 pulsar/consumer_partition.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 0b4e9fe..ecde0bf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -551,6 +551,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
replicatedFrom:  
msgMeta.GetReplicatedFrom(),
redeliveryCount: 
response.GetRedeliveryCount(),
encryptionContext:   
createEncryptionContext(msgMeta),
+   orderingKey: 
string(msgMeta.OrderingKey),
},
}
pc.queueCh <- messages
@@ -622,6 +623,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
replicationClusters: msgMeta.GetReplicateTo(),
replicatedFrom:  
msgMeta.GetReplicatedFrom(),
redeliveryCount: 
response.GetRedeliveryCount(),
+   orderingKey: string(smm.OrderingKey),
}
} else {
msg = &message{


[pulsar-client-go] branch master updated (2bbfb8e -> 067f805)

2021-12-24 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from 2bbfb8e  [Issue 513] Correct apparent logic error in batchContainer's 
hasSpace() method (#678)
 add 067f805  Upgrade DataDog/zstd to v1.5.0 (#690)

No new revisions were added by this update.

Summary of changes:
 go.mod | 2 +-
 go.sum | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)


[pulsar-client-go] branch master updated: [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)

2021-12-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 2bbfb8e  [Issue 513] Correct apparent logic error in batchContainer's 
hasSpace() method (#678)
2bbfb8e is described below

commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7
Author: Ben Schofield 
AuthorDate: Wed Dec 22 06:22:18 2021 +

[Issue 513] Correct apparent logic error in batchContainer's hasSpace() 
method (#678)

* Correct apparent logic error in batchContainer's hasSpace() method.

* Make the same change to keyBasedBatchContainer's hasSpace() method.

* Fix comment length to pass style checks.

* Allow TestMaxMessageSize() to run

* Add TestMaxBatchSize() to validate that this limit is respected. Based on 
the results of the test, change the < in hasSpace() to be a <=.

* Further correct logic in hasSpace() / IsFull()

* Remember to make the change in both places!

Co-authored-by: ben 
---
 pulsar/internal/batch_builder.go   |  7 ++---
 pulsar/internal/key_based_batch_builder.go |  7 ++---
 pulsar/producer_test.go| 42 --
 3 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 7e47304..9d18f26 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -150,14 +150,15 @@ func NewBatchBuilder(
return &bc, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size 
allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum 
size allowed by the batch
 func (bc *batchContainer) IsFull() bool {
-   return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > 
uint32(bc.maxBatchSize)
+   return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= 
uint32(bc.maxBatchSize)
 }
 
+// hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
-   return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+   return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to batch.
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 24d564b..d09138c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder(
return bb, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size 
allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum 
size allowed by the batch
 func (bc *keyBasedBatchContainer) IsFull() bool {
-   return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > 
uint32(bc.maxBatchSize)
+   return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= 
uint32(bc.maxBatchSize)
 }
 
 func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
return true
 }
 
+// hasSpace should return true if and only if the batch container can 
accommodate another message of length payload.
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
-   return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+   return bc.numMessages+1 <= bc.maxMessages && 
bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to key-based batch with message key.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f914017..124c828 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) {
canc()
 }
 
-func TestMaxMessageSize(t *testing.T) {
+func TestMaxBatchSize(t *testing.T) {
+   // Set to be < serverMaxMessageSize
+   batchMaxMessageSize := 512 * 1024
+
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()
+
producer, err := client.CreateProducer(ProducerOptions{
-   Topic: newTopicName(),
+   Topic:   newTopicName(),
+   BatchingMaxSize: uint(batchMaxMessageSize),
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()
+
+   for

[pulsar] branch master updated (c94c81c -> f69a11e)

2021-12-14 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from c94c81c  fix wrong result for looking up a non-exist topic by rest api 
(#13055)
 add f69a11e  [Docs] Add docs of prometheus metrics for pulsar go client 
(#13271)

No new revisions were added by this update.

Summary of changes:
 site2/docs/client-libraries-go.md | 147 ++
 1 file changed, 147 insertions(+)


[pulsar-client-go] branch master updated: Add properties filed for batch (#683)

2021-12-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new d0d5d0a  Add properties filed for batch (#683)
d0d5d0a is described below

commit d0d5d0ae403717505f85b86c100fac6ff4d7dcf6
Author: xiaolong ran 
AuthorDate: Fri Dec 10 15:12:53 2021 +0800

Add properties filed for batch (#683)

Signed-off-by: xiaolongran 

### Motivation

Currently, when we disable batch in Producer, in `handleSend()` of 
`serverCnx.java`, the `msgMetadata.hasNumMessagesInBatch()` is **true** and 
`msgMetadata.getNumMessagesInBatch()` is **1**.

At this point, if we get the Properties object we set on the producer side 
on the broker side, the display is empty.

Go SDK set Properties:

```

// disable batch
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "topic-1",
DisableBatching: true,
})

// set properties for every message
producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"key-1": "value-1",
},
});
```

Broker get message properties from entry metadata is null:

```
ByteBuf metadataAndPayload = entry.getDataBuffer();

MessageMetadata msgMetadata = 
Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
```

And `msgMetadata.getPropertiesCount() <= 0`.


### Modifications

Add properties filed in Add single message to batchContainer
---
 pulsar/internal/batch_builder.go | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index d08af53..7e47304 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -191,6 +191,7 @@ func (bc *batchContainer) Add(
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
+   bc.msgMetadata.Properties = metadata.Properties
 
if deliverAt.UnixNano() > 0 {
bc.msgMetadata.DeliverAtTime = 
proto.Int64(int64(TimestampMillis(deliverAt)))
@@ -211,6 +212,7 @@ func (bc *batchContainer) reset() {
bc.callbacks = []interface{}{}
bc.msgMetadata.ReplicateTo = nil
bc.msgMetadata.DeliverAtTime = nil
+   bc.msgMetadata.Properties = nil
 }
 
 // Flush all the messages buffered in the client and wait until all messages 
have been successfully persisted.


[pulsar] branch master updated (a89c6e4 -> 1220f84)

2021-12-07 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from a89c6e4  fix(docs): incorrect command name and description for 
autorecovery (#12989)
 add 1220f84  fix:remove the loadbalance/bundle-data node (#13164)

No new revisions were added by this update.

Summary of changes:
 .../broker/resources/NamespaceResources.java   | 36 ++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  2 ++
 .../pulsar/broker/admin/impl/TenantsBase.java  |  2 ++
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  6 
 4 files changed, 46 insertions(+)


[pulsar] branch master updated (bce8a2e -> 86fe7d2)

2021-11-29 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from bce8a2e  [Workflow] add guidelines for merging a PR (#12988)
 add 86fe7d2  Fix zk-node leak of admin path (#12972)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/resources/LocalPoliciesResources.java  | 16 
 .../pulsar/broker/resources/NamespaceResources.java  | 15 +++
 .../org/apache/pulsar/broker/admin/impl/TenantsBase.java |  4 
 .../org/apache/pulsar/broker/admin/AdminApiTest2.java|  4 
 4 files changed, 39 insertions(+)


[pulsar-client-go] branch master updated: Add subscription properties for ConsumerOptions (#671)

2021-11-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 6385727  Add subscription properties for ConsumerOptions (#671)
6385727 is described below

commit 6385727f1a5e40e144f3239794135b5399a5a49e
Author: xiaolong ran 
AuthorDate: Mon Nov 29 12:39:49 2021 +0800

Add subscription properties for ConsumerOptions (#671)

Signed-off-by: xiaolongran 

### Motivation

In https://github.com/apache/pulsar/pull/12869, we introduce pluggable 
entry filter in Dispatcher, the pull request is the Go SDK implementation of 
this PIP


### Modifications

*Describe the modifications you've done.*

- Add subscription properties for ConsumerOptions
- Update `pulsarApi.proto` file
---
 pulsar/consumer.go   |6 +
 pulsar/consumer_impl.go  |2 +
 pulsar/consumer_partition.go |6 +
 pulsar/internal/pulsar_proto/PulsarApi.pb.go | 1609 +++---
 pulsar/internal/pulsar_proto/PulsarApi.proto |  112 +-
 5 files changed, 1274 insertions(+), 461 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index cdb6887..a71a2d4 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -101,6 +101,12 @@ type ConsumerOptions struct {
// Those properties will be visible in the topic stats
Properties map[string]string
 
+   // SubscriptionProperties specify the subscription properties for this 
subscription.
+   //
+   // > Notice: SubscriptionProperties are immutable, and consumers under 
the same subscription will fail to create a
+   // > subscription if they use different properties.
+   SubscriptionProperties map[string]string
+
// Type specifies the subscription type to be used when subscribing to 
a topic.
// Default is `Exclusive`
Type SubscriptionType
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 1bf75b5..00e0b76 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -297,6 +297,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
+   subProperties := c.options.SubscriptionProperties
 
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
@@ -333,6 +334,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
nackRedeliveryDelay:nackRedeliveryDelay,
nackBackoffPolicy:  
c.options.NackBackoffPolicy,
metadata:   metadata,
+   subProperties:  subProperties,
replicateSubscriptionState: 
c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
subscriptionMode:   durable,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index c8e5d9f..0b4e9fe 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -91,6 +91,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelaytime.Duration
nackBackoffPolicy  NackBackoffPolicy
metadata   map[string]string
+   subProperties  map[string]string
replicateSubscriptionState bool
startMessageID trackingMessageID
startMessageIDInclusivebool
@@ -1058,6 +1059,7 @@ func (pc *partitionConsumer) grabConn() error {
PriorityLevel:  nil,
Durable:
proto.Bool(pc.options.subscriptionMode == durable),
Metadata:   
internal.ConvertFromStringMap(pc.options.metadata),
+   SubscriptionProperties: 
internal.ConvertFromStringMap(pc.options.subProperties),
ReadCompacted:  
proto.Bool(pc.options.readCompacted),
Schema: pbSchema,
InitialPosition:initialPosition.Enum(),
@@ -1075,6 +1077,10 @@ func (pc *partitionConsumer) grabConn() error {
cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
}
 
+   if len(pc.options.subProperties) > 0 {
+   cmdSubscribe.SubscriptionProperties = 
toKeyValues(pc.options.subProperties)
+   }
+
// force topic creation is enabled by default so
// we only need to set the flag when disabling it
if pc.options.disableForceTopicCreation {
diff --git a/pulsar/internal/pulsar_proto/PulsarApi.pb.go 
b/pulsar/internal/pulsar_proto/PulsarA

[pulsar] branch master updated (5e25184 -> bef937b)

2021-11-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 5e25184  [docs] [ISSUE 11805] Add topic lookup metrics (#11927)
 add bef937b  Add subscription properties for SubscriptionStats (#12979)

No new revisions were added by this update.

Summary of changes:
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/admin/CreateSubscriptionTest.java   | 39 ++
 .../common/policies/data/SubscriptionStats.java|  3 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  7 
 4 files changed, 50 insertions(+)


[pulsar] branch master updated (60f5475 -> f68bec4)

2021-11-17 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 60f5475  Improve exception info for invaild time-related option 
(#12828)
 add f68bec4  Fix znode leakage caused by deleting tenant (#12711)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/resources/BaseResources.java |  72 +++
 .../broker/resources/LocalPoliciesResources.java   |   4 +
 .../broker/resources/NamespaceResources.java   |  70 +--
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 100 +++--
 .../pulsar/broker/admin/impl/TenantsBase.java  |  69 ++
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  90 +++
 6 files changed, 273 insertions(+), 132 deletions(-)


[pulsar] branch master updated (f9d16ca -> f44f6a0)

2021-11-08 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from f9d16ca  [Doc] Add explanations for setting geo-replication at topic 
level (#12633)
 add f44f6a0  Add docs for nack backoff policy (#12660)

No new revisions were added by this update.

Summary of changes:
 site2/docs/client-libraries-java.md | 20 
 site2/docs/concepts-messaging.md| 15 +++
 2 files changed, 35 insertions(+)


[pulsar-client-go] branch master updated: remove unused dependency (#661)

2021-11-07 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new fe3b7c4  remove unused dependency (#661)
fe3b7c4 is described below

commit fe3b7c4e445b3de42974ca692574229ad9099a45
Author: Eugene R 
AuthorDate: Mon Nov 8 06:42:48 2021 +0200

remove unused dependency (#661)


Remove unused dependency in `oauth2` module.

### Verifying this change

- [x] Make sure that the change passes the CI checks.


This change is a trivial rework / code cleanup without any test coverage.
---
 oauth2/go.mod | 1 -
 oauth2/go.sum | 2 --
 2 files changed, 3 deletions(-)

diff --git a/oauth2/go.mod b/oauth2/go.mod
index 89d5015..091477d 100644
--- a/oauth2/go.mod
+++ b/oauth2/go.mod
@@ -4,7 +4,6 @@ go 1.13
 
 require (
github.com/99designs/keyring v1.1.6
-   github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/form3tech-oss/jwt-go v3.2.3+incompatible
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
diff --git a/oauth2/go.sum b/oauth2/go.sum
index 48ffe44..dad3d35 100644
--- a/oauth2/go.sum
+++ b/oauth2/go.sum
@@ -6,8 +6,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
-github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b 
h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod 
h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
 github.com/form3tech-oss/jwt-go v3.2.3+incompatible 
h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=


[pulsar-client-go] branch master updated: Support nack backoff policy for SDK (#660)

2021-11-07 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 567263f  Support nack backoff policy for SDK (#660)
567263f is described below

commit 567263ff5b077e9a737f96c9523646225502f141
Author: xiaolong ran 
AuthorDate: Mon Nov 8 12:42:03 2021 +0800

Support nack backoff policy for SDK (#660)

* Support nack backoff policy for SDK

Signed-off-by: xiaolongran 

* add test case and fix some logic

Signed-off-by: xiaolongran 

* fix action ciu

Signed-off-by: xiaolongran 

* fix action ci

Signed-off-by: xiaolongran 

* fix some logic

Signed-off-by: xiaolongran 

* fix data race

Signed-off-by: xiaolongran 

* fix data race

Signed-off-by: xiaolongran 

* fix a little

Signed-off-by: xiaolongran 

* fix data race

Signed-off-by: xiaolongran 

* fix data race

Signed-off-by: xiaolongran 

* fix some comments

Signed-off-by: xiaolongran 

* fix test case

Signed-off-by: xiaolongran 
---
 pulsar/consumer.go |  11 +++
 pulsar/consumer_impl.go|  20 +
 pulsar/consumer_multitopic.go  |  16 
 pulsar/consumer_partition.go   |   8 +-
 pulsar/consumer_regex.go   |  16 
 pulsar/impl_message.go |   7 ++
 pulsar/negative_acks_tracker.go|  44 +-
 pulsar/negative_acks_tracker_test.go   | 156 -
 pulsar/negative_backoff_policy.go  |  46 ++
 pulsar/negative_backoff_policy_test.go |  34 +++
 10 files changed, 351 insertions(+), 7 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c9fbc0d..5127955 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -158,6 +158,17 @@ type ConsumerOptions struct {
 
// Decryption decryption related fields to decrypt the encrypted message
Decryption *MessageDecryptionInfo
+
+   // If enabled, the default implementation of NackBackoffPolicy will be 
used to calculate the delay time of
+   // nack backoff, Default: false.
+   EnableDefaultNackBackoffPolicy bool
+
+   // NackBackoffPolicy is a redelivery backoff mechanism which we can 
achieve redelivery with different
+   // delays according to the number of times the message is retried.
+   //
+   // > Notice: the NackBackoffPolicy will not work with 
`consumer.NackID(MessageID)`
+   // > because we are not able to get the redeliveryCount from the 
message ID.
+   NackBackoffPolicy NackBackoffPolicy
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6aef497..1bf75b5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -35,6 +35,7 @@ const defaultNackRedeliveryDelay = 1 * time.Minute
 type acker interface {
AckID(id trackingMessageID)
NackID(id trackingMessageID)
+   NackMsg(msg Message)
 }
 
 type consumer struct {
@@ -87,6 +88,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
}
}
 
+   if options.NackBackoffPolicy == nil && 
options.EnableDefaultNackBackoffPolicy {
+   options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
+   }
+
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
@@ -326,6 +331,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
partitionIdx:   idx,
receiverQueueSize:  receiverQueueSize,
nackRedeliveryDelay:nackRedeliveryDelay,
+   nackBackoffPolicy:  
c.options.NackBackoffPolicy,
metadata:   metadata,
replicateSubscriptionState: 
c.options.ReplicateSubscriptionState,
startMessageID: trackingMessageID{},
@@ -489,6 +495,20 @@ func (c *consumer) ReconsumeLater(msg Message, delay 
time.Duration) {
 }
 
 func (c *consumer) Nack(msg Message) {
+   if c.options.EnableDefaultNackBackoffPolicy || 
c.options.NackBackoffPolicy != nil {
+   mid, ok := c.messageID(msg.ID())
+   if !ok {
+   return
+   }
+
+   if mid.consumer != nil {
+   mid.Nack()
+   return
+   }
+   c.consumers[mid.partitionIdx].NackMsg(msg)
+   return
+   }
+
c.NackID(msg.ID()

[pulsar] branch master updated: Add negative ack redelivery backoff. (#12566)

2021-11-03 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d79cd04  Add negative ack redelivery backoff. (#12566)
d79cd04 is described below

commit d79cd0479eabebef2ce72eca1330af103115f67f
Author: hanmz 
AuthorDate: Thu Nov 4 14:15:17 2021 +0800

Add negative ack redelivery backoff. (#12566)

### Motivation

Add negative ack redelivery backoff.


### Modifications

- add new `NegativeAckBackoff` interface
- expose `egativeAckRedeliveryBackoff` in ConsumerBulider
- add unit test case
---
 .../pulsar/client/impl/NegativeAcksTest.java   | 102 +
 .../pulsar/client/api/ConsumerConfiguration.java   |  17 
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  14 +++
 .../client/api/NegativeAckRedeliveryBackoff.java   |  40 
 .../pulsar/client/impl/ConsumerBuilderImpl.java|   8 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java|   8 ++
 .../client/impl/MultiTopicsConsumerImpl.java   |  10 ++
 .../NegativeAckRedeliveryExponentialBackoff.java   |  94 +++
 .../pulsar/client/impl/NegativeAcksTracker.java|  46 +-
 .../impl/conf/ConsumerConfigurationData.java   |   4 +
 .../api/NegativeAckRedeliveryBackoffTest.java  |  55 +++
 .../client/impl/ConsumerBuilderImplTest.java   |   8 ++
 12 files changed, 405 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 5eb43af..638a969 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.NegativeAckRedeliveryBackoff;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
@@ -154,4 +155,105 @@ public class NegativeAcksTest extends 
ProducerConsumerBase {
 consumer.close();
 producer.close();
 }
+
+@DataProvider(name = "variationsBackoff")
+public static Object[][] variationsBackoff() {
+return new Object[][] {
+// batching / partitions / subscription-type / 
min-nack-time-ms/ max-nack-time-ms / ack-timeout
+{ false, false, SubscriptionType.Shared, 100, 1000, 0 },
+{ false, false, SubscriptionType.Failover, 100, 1000, 0 },
+{ false, true, SubscriptionType.Shared, 100, 1000, 0 },
+{ false, true, SubscriptionType.Failover, 100, 1000, 0 },
+{ true, false, SubscriptionType.Shared, 100, 1000, 0 },
+{ true, false, SubscriptionType.Failover, 100, 1000, 0 },
+{ true, true, SubscriptionType.Shared, 100, 1000, 0 },
+{ true, true, SubscriptionType.Failover, 100, 1000, 0 },
+
+{ false, false, SubscriptionType.Shared, 0, 1000, 0 },
+{ false, false, SubscriptionType.Failover, 0, 1000, 0 },
+{ false, true, SubscriptionType.Shared, 0, 1000, 0 },
+{ false, true, SubscriptionType.Failover, 0, 1000, 0 },
+{ true, false, SubscriptionType.Shared, 0, 1000, 0 },
+{ true, false, SubscriptionType.Failover, 0, 1000, 0 },
+{ true, true, SubscriptionType.Shared, 0, 1000, 0 },
+{ true, true, SubscriptionType.Failover, 0, 1000, 0 },
+
+{ false, false, SubscriptionType.Shared, 100, 1000, 1000 },
+{ false, false, SubscriptionType.Failover, 100, 1000, 1000 },
+{ false, true, SubscriptionType.Shared, 100, 1000, 1000 },
+{ false, true, SubscriptionType.Failover, 100, 1000, 1000 },
+{ true, false, SubscriptionType.Shared, 100, 1000, 1000 },
+{ true, false, SubscriptionType.Failover, 100, 1000, 1000 },
+{ true, true, SubscriptionType.Shared, 100, 1000, 1000 },
+{ true, true, SubscriptionType.Failover, 100, 1000, 1000 },
+
+{ false, false, SubscriptionType.Shared, 0, 1000, 1000 },
+{ false, false, SubscriptionType.Failover, 0, 1000, 1000 },
+{ false, true, SubscriptionType.Shared, 0, 1000, 1000 },
+{ false, true, SubscriptionType.Failover, 0, 1000, 1000 },
+{ true, false, SubscriptionType.Shared, 0, 1000, 1000 },
+{ true, false, SubscriptionType.Failove

[pulsar-client-go] branch master updated: fix issue 650, different handle sequence value (#651)

2021-11-03 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new d80a722  fix issue 650,different handle sequence value (#651)
d80a722 is described below

commit d80a722ac1ab197c7e8649efdeb1e16356cbb3bb
Author: baomingyu 
AuthorDate: Thu Nov 4 10:57:21 2021 +0800

fix issue 650,different handle sequence value (#651)

* fix issue 650,different handle sequence value

* add sequenceId equal check
---
 pulsar/producer_partition.go | 76 ++--
 1 file changed, 38 insertions(+), 38 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b2b9273..d67c0c0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -784,7 +784,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
return
}
 
-   if pi.sequenceID != response.GetSequenceId() {
+   if pi.sequenceID < response.GetSequenceId() {
// if we receive a receipt that is not the one expected, the 
state of the broker and the producer differs.
// At that point, it is better to close the connection to the 
broker to reconnect to a broker hopping it solves
// the state discrepancy.
@@ -792,49 +792,49 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
response.GetSequenceId(), pi.sequenceID)
p.cnx.Close()
return
-   }
-
-   // The ack was indeed for the expected item in the queue, we can remove 
it and trigger the callback
-   p.pendingQueue.Poll()
-
-   now := time.Now().UnixNano()
-
-   // lock the pending item while sending the requests
-   pi.Lock()
-   defer pi.Unlock()
-   p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 
1.0e9)
-   for idx, i := range pi.sendRequests {
-   sr := i.(*sendRequest)
-   if sr.msg != nil {
-   atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-   p.publishSemaphore.Release()
+   } else if pi.sequenceID == response.GetSequenceId() {
+   // The ack was indeed for the expected item in the queue, we 
can remove it and trigger the callback
+   p.pendingQueue.Poll()
+
+   now := time.Now().UnixNano()
+
+   // lock the pending item while sending the requests
+   pi.Lock()
+   defer pi.Unlock()
+   
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
+   for idx, i := range pi.sendRequests {
+   sr := i.(*sendRequest)
+   if sr.msg != nil {
+   atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
+   p.publishSemaphore.Release()
+
+   
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+   p.metrics.MessagesPublished.Inc()
+   p.metrics.MessagesPending.Dec()
+   payloadSize := float64(len(sr.msg.Payload))
+   p.metrics.BytesPublished.Add(payloadSize)
+   p.metrics.BytesPending.Sub(payloadSize)
+   }
 
-   
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
-   p.metrics.MessagesPublished.Inc()
-   p.metrics.MessagesPending.Dec()
-   payloadSize := float64(len(sr.msg.Payload))
-   p.metrics.BytesPublished.Add(payloadSize)
-   p.metrics.BytesPending.Sub(payloadSize)
-   }
+   if sr.callback != nil || len(p.options.Interceptors) > 
0 {
+   msgID := newMessageID(
+   int64(response.MessageId.GetLedgerId()),
+   int64(response.MessageId.GetEntryId()),
+   int32(idx),
+   p.partitionIdx,
+   )
 
-   if sr.callback != nil || len(p.options.Interceptors) > 0 {
-   msgID := newMessageID(
-   int64(response.MessageId.GetLedgerId()),
-   int64(response.MessageId.GetEntryId()),
-   int32(idx),
-   p.partitionIdx,
-   )
+   if sr.callback != nil {
+   sr.callback(m

[pulsar-client-go] branch master updated (6ffefa5 -> 972dc97)

2021-10-31 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from 6ffefa5  Update change log for 0.7.0 release (#655)
 add 972dc97  Update version to 0.7.0 (#654)

No new revisions were added by this update.

Summary of changes:
 VERSION| 2 +-
 stable.txt | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)


[pulsar-client-go] branch master updated: Update change log for 0.7.0 release (#655)

2021-10-31 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 6ffefa5  Update change log for 0.7.0 release (#655)
6ffefa5 is described below

commit 6ffefa5fe7ce12f3df7680282eae33ddf6d777a5
Author: cckellogg 
AuthorDate: Sun Oct 31 19:35:58 2021 -0700

Update change log for 0.7.0 release (#655)

Update change log for 0.7.0 release
---
 CHANGELOG.md | 24 
 1 file changed, 24 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index ce91e22..2e43ddc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,30 @@
 
 All notable changes to this project will be documented in this file.
 
+[0.7.0] 2021-10-31
+
+## Feature
+* Encryption support for producer, see 
[PR-560](https://github.com/apache/pulsar-client-go/pull/560)
+* Decrytion support for consumer, see 
[PR-612](https://github.com/apache/pulsar-client-go/pull/612)
+* User-defined metric cardinality, see 
[PR-604](https://github.com/apache/pulsar-client-go/pull/604)
+* Better support for Azure AD OAuth 2.0, see 
[PR-630](https://github.com/apache/pulsar-client-go/pull/630), 
[PR-633](https://github.com/apache/pulsar-client-go/pull/633), 
[PR-634](https://github.com/apache/pulsar-client-go/pull/634)
+* Removed testing for go versions 1.11 and 1.12, see 
[PR-632](https://github.com/apache/pulsar-client-go/pull/632)
+* Add epoch to create producer to prevent a duplicate producer when broker is 
not available., see [PR-582] 
(https://github.com/apache/pulsar-client-go/pull/582)
+
+## Improve
+* Fix batch size limit validation, see 
[PR-528](https://github.com/apache/pulsar-client-go/pull/528)
+* Fix logic of command for sendError, see 
[PR-622](https://github.com/apache/pulsar-client-go/pull/622)
+* Drain connection requests channel without closing, see 
[PR-645](https://github.com/apache/pulsar-client-go/pull/645)
+* Fix ConsumersOpened counter not incremented when use multitopic or regexp 
consumer, see [PR-619](https://github.com/apache/pulsar-client-go/pull/619)
+* Fix reconnection logic when topic is deleted, see 
[PR-627](https://github.com/apache/pulsar-client-go/pull/627)
+* Fix panic when scale down partitions, see 
[PR-601](https://github.com/apache/pulsar-client-go/pull/601)
+* Fix missing metrics for topics by registration of existing collector, see 
[PR-600](https://github.com/apache/pulsar-client-go/pull/600)
+* Fix producer panic by oldProducers, see 
[PR-598](https://github.com/apache/pulsar-client-go/pull/598)
+* Fail pending messages when topic is terminated, see 
[PR-588](https://github.com/apache/pulsar-client-go/pull/588)
+* Fix handle send error panic, see 
[PR-576](https://github.com/apache/pulsar-client-go/pull/576)
+
+
+
 [0.6.0] 2021-07-21
 
 ## Feature


[pulsar-client-go] branch master updated (171ef57 -> 1bd2993)

2021-10-31 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from 171ef57  Go version required should be 1.13, example produce log print 
error (#649)
 add 1bd2993  Update release docs with missing information (#656)

No new revisions were added by this update.

Summary of changes:
 docs/release-process.md | 23 +++
 1 file changed, 19 insertions(+), 4 deletions(-)


[pulsar-client-go] branch master updated: [issue 513] fix batch size limit validation (#528)

2021-10-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new c3922b8  [issue 513] fix batch size limit validation (#528)
c3922b8 is described below

commit c3922b82cd0e58b69670a28a96abdf88f61e0f91
Author: Ming 
AuthorDate: Mon Oct 25 07:56:54 2021 -0400

[issue 513] fix batch size limit validation (#528)

* fix batch size limit validation

* respect either of one batch size and number of messages limit

* correct hasSpace logic

* fix key-based batch builder
---
 pulsar/internal/batch_builder.go   | 4 ++--
 pulsar/internal/key_based_batch_builder.go | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 92d6249..d08af53 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,7 +157,7 @@ func (bc *batchContainer) IsFull() bool {
 
 func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
-   return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > 
uint32(bc.maxBatchSize)
+   return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to batch.
@@ -174,7 +174,7 @@ func (bc *batchContainer) Add(
// There's already a message with cluster replication list. 
need to flush before next
// message can be sent
return false
-   } else if bc.hasSpace(payload) {
+   } else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 940aa9f..24d564b 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,7 +117,7 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
 
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
-   return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > 
uint32(bc.maxBatchSize)
+   return bc.numMessages+1 < bc.maxMessages || 
(bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to key-based batch with message key.
@@ -134,7 +134,7 @@ func (bc *keyBasedBatchContainer) Add(
// There's already a message with cluster replication list. 
need to flush before next
// message can be sent
return false
-   } else if bc.hasSpace(payload) {
+   } else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}


[pulsar-client-go] branch master updated: Fix minor api issue and comments. (#637)

2021-10-12 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e55be0  Fix minor api issue and comments. (#637)
4e55be0 is described below

commit 4e55be0a363ef57e90a60a29803b578d0498c1d1
Author: cckellogg 
AuthorDate: Wed Oct 13 02:39:15 2021 -0400

Fix minor api issue and comments. (#637)

Fix some comment spelling mistakes and spelling on one api.
---
 pulsar/crypto/crypto_failure_action.go  |  2 +-
 pulsar/crypto/default_message_crypto.go | 12 ++--
 pulsar/crypto/encryption_key_Info.go|  8 
 pulsar/crypto/message_crypto.go | 12 
 pulsar/crypto/message_metadata.go   | 12 ++--
 pulsar/crypto/message_metadata_test.go  |  6 +++---
 6 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/pulsar/crypto/crypto_failure_action.go 
b/pulsar/crypto/crypto_failure_action.go
index 891d740..0bdd289 100644
--- a/pulsar/crypto/crypto_failure_action.go
+++ b/pulsar/crypto/crypto_failure_action.go
@@ -21,7 +21,7 @@ const (
// ProducerCryptoFailureActionFail this is the default option to fail 
send if crypto operation fails.
ProducerCryptoFailureActionFail = iota
 
-   // ProducerCryptoFailureActionSend ingnore crypto failure and proceed 
with sending unencrypted message.
+   // ProducerCryptoFailureActionSend ignore crypto failure and proceed 
with sending unencrypted message.
ProducerCryptoFailureActionSend
 )
 
diff --git a/pulsar/crypto/default_message_crypto.go 
b/pulsar/crypto/default_message_crypto.go
index 93a0244..74e4592 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -34,7 +34,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
 )
 
-// DefaultMessageCrypto implmentation of the interface MessageCryto
+// DefaultMessageCrypto implementation of the interface MessageCryto
 type DefaultMessageCrypto struct {
// data key which is used to encrypt/decrypt messages
dataKey []byte
@@ -134,7 +134,7 @@ func (d *DefaultMessageCrypto) RemoveKeyCipher(keyName 
string) bool {
return true
 }
 
-// Encrypt encrypt payload using encryption keys and add encrypted data key
+// Encrypt payload using encryption keys and add encrypted data key
 // to message metadata. Here data key is encrypted
 // using public key
 func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
@@ -161,7 +161,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
keyInfo, keyInfoOk := k.(*EncryptionKeyInfo)
 
if keyInfoOk {
-   msgMetadata.UpsertEncryptionkey(*keyInfo)
+   msgMetadata.UpsertEncryptionKey(*keyInfo)
} else {
d.logger.Error("failed to get EncryptionKeyInfo 
for key %v", keyName)
}
@@ -206,8 +206,8 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
return gcm.Seal(nil, nonce, payload, nil), nil
 }
 
-// Decrypt decrypt the payload using decrypted data key.
-// Here data key is read from from the message
+// Decrypt the payload using decrypted data key.
+// Here data key is read from the message
 // metadata and  decrypted using private key.
 func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
payload []byte,
@@ -285,7 +285,7 @@ func (d *DefaultMessageCrypto) 
getKeyAndDecryptData(msgMetadata MessageMetadataS
return decryptedData, nil
}
} else {
-   // First time, entry wont be present in cache
+   // First time, entry won't be present in cache
d.logger.Debugf("%s Failed to decrypt data or data key 
is not in cache. Will attempt to refresh", d.logCtx)
}
}
diff --git a/pulsar/crypto/encryption_key_Info.go 
b/pulsar/crypto/encryption_key_Info.go
index 8418682..11dd444 100644
--- a/pulsar/crypto/encryption_key_Info.go
+++ b/pulsar/crypto/encryption_key_Info.go
@@ -24,7 +24,7 @@ type EncryptionKeyInfo struct {
name string
 }
 
-// NewEncryptionKeyInfo
+// NewEncryptionKeyInfo create a new EncryptionKeyInfo
 func NewEncryptionKeyInfo(name string, key []byte, metadata map[string]string) 
*EncryptionKeyInfo {
return &EncryptionKeyInfo{
metadata: metadata,
@@ -33,17 +33,17 @@ func NewEncryptionKeyInfo(name string, key []byte, metadata 
map[string]string) *
}
 }
 
-// GetKey get key
+// Name get the name of the key
 func (eci *EncryptionKeyInfo) Name() string {
return eci.name
 }
 
-// GetValue get value
+// Key get the key data
 func (eci *EncryptionKeyInfo) Key() []byte {

[pulsar-client-go] branch master updated: Fix logic of command for sendError (#622)

2021-10-10 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 791d342   Fix logic of command for sendError (#622)
791d342 is described below

commit 791d342a98d0f0b4189913a5ea61547964d095c8
Author: xiaolong ran 
AuthorDate: Mon Oct 11 11:20:33 2021 +0800

 Fix logic of command for sendError (#622)

### Motivation



![image](https://user-images.githubusercontent.com/20965307/135020293-06cb72cc-5ed9-4bc5-a7ba-3909da57a8a6.png)


As shown in the figure above, the `ServerError` returned by the broker is 
`UnknownError` when the client receives it. In fact, we handled the wrong 
command here. Here we should deal with `CommandSendError` instead of 
`CommandError`. Correspondingly, we should deal with the `listener` map used to 
cache the producer instead of the corresponding `pendingRequest` map.
---
 pulsar/internal/connection.go | 45 +++
 1 file changed, 24 insertions(+), 21 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 0313e4e..163dcac 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -531,7 +531,7 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
c.handleResponseError(cmd.GetError())
 
case pb.BaseCommand_SEND_ERROR:
-   c.handleSendError(cmd.GetError())
+   c.handleSendError(cmd.GetSendError())
 
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
@@ -752,31 +752,29 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-   c.log.Warnf("Received send error from server: [%v] : [%s]", 
cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+   c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
 
-   requestID := cmdError.GetRequestId()
+   producerID := sendError.GetProducerId()
 
-   switch cmdError.GetError() {
+   switch sendError.GetError() {
case pb.ServerError_NotAllowedError:
-   request, ok := c.deletePendingRequest(requestID)
+   _, ok := c.deletePendingProducers(producerID)
if !ok {
c.log.Warnf("Received unexpected error response for 
request %d of type %s",
-   requestID, cmdError.GetError())
+   producerID, sendError.GetError())
return
}
 
-   errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
-   request.callback(nil, errors.New(errMsg))
+   c.log.Warnf("server error: %s: %s", sendError.GetError(), 
sendError.GetMessage())
case pb.ServerError_TopicTerminatedError:
-   request, ok := c.deletePendingRequest(requestID)
+   _, ok := c.deletePendingProducers(producerID)
if !ok {
-   c.log.Warnf("Received unexpected error response for 
request %d of type %s",
-   requestID, cmdError.GetError())
+   c.log.Warnf("Received unexpected error response for 
producer %d of type %s",
+   producerID, sendError.GetError())
return
}
-   errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
-   request.callback(nil, errors.New(errMsg))
+   c.log.Warnf("server error: %s: %s", sendError.GetError(), 
sendError.GetMessage())
default:
// By default, for transient error, let the reconnection logic
// to take place and re-establish the produce again
@@ -784,6 +782,17 @@ func (c *connection) handleSendError(cmdError 
*pb.CommandError) {
}
 }
 
+func (c *connection) deletePendingProducers(producerID uint64) 
(ConnectionListener, bool) {
+   c.listenersLock.Lock()
+   producer, ok := c.listeners[producerID]
+   if ok {
+   delete(c.listeners, producerID)
+   }
+   c.listenersLock.Unlock()
+
+   return producer, ok
+}
+
 func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
@

[pulsar-client-go] branch master updated: User-defined metric cardinality (#604)

2021-10-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new ce79489  User-defined metric cardinality (#604)
ce79489 is described below

commit ce794898bce8d0799e143b42833167252a3b6cd0
Author: Andy Walker 
AuthorDate: Sat Oct 9 05:03:58 2021 -0400

User-defined metric cardinality (#604)

* initial commit

* forgot "none"

* Satisfy the lint gods

Co-authored-by: xiaolongran 
---
 go.mod |  2 +
 go.sum |  2 -
 pulsar/client.go   | 15 ++
 pulsar/client_impl.go  |  8 ++-
 pulsar/consumer_impl.go|  4 +-
 pulsar/consumer_partition.go   |  4 +-
 pulsar/consumer_partition_test.go  |  6 +--
 pulsar/internal/lookup_service_test.go | 29 ++-
 pulsar/internal/metrics.go | 95 +-
 pulsar/producer_impl.go|  4 +-
 pulsar/producer_partition.go   |  4 +-
 pulsar/reader_impl.go  |  4 +-
 12 files changed, 111 insertions(+), 66 deletions(-)

diff --git a/go.mod b/go.mod
index 354f5b4..af7c571 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,8 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/linkedin/goavro/v2 v2.9.8
+   github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
+   github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 8b372c7..85fba89 100644
--- a/go.sum
+++ b/go.sum
@@ -29,7 +29,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
@@ -159,7 +158,6 @@ github.com/stretchr/objx v0.2.0 
h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
 github.com/stretchr/objx v0.2.0/go.mod 
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
diff --git a/pulsar/client.go b/pulsar/client.go
index cc6fb3c..8ff152a 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -117,6 +117,10 @@ type ClientOptions struct {
// FIXME: use `logger` as internal field name instead of `log` as it's 
more idiomatic
Logger log.Logger
 
+   // Specify metric cardinality to the tenant, namespace or topic levels, 
or remove it completely.
+   // Default: MetricsCardinalityNamespace
+   MetricsCardinality MetricsCardinality
+
// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
 }
@@ -150,3 +154,14 @@ type Client interface {
// Close Closes the Client and free associated resources
Close()
 }
+
+// MetricsCardinality represents the specificty of labels on a per-metric basis
+type MetricsCardinality int
+
+const (
+   _   MetricsCardinality = iota
+   MetricsCardinalityNone // Do not add additional 
labels to metrics
+   MetricsCardinalityTenant   // Label metrics by 
tenant
+   MetricsCardinalityNamespace// Label metrics by 
tenant and namespace
+   MetricsCardinalityTopic// Label metrics by topic
+)
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7d2fcfd..5682927 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -107,11 +107,15 @@ func newClient(options ClientOptions)

[pulsar-client-go] branch master updated: Encryption support ext consumer (#612)

2021-10-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new ab96ad7  Encryption support ext consumer (#612)
ab96ad7 is described below

commit ab96ad7d84b7c53e3e6d241f68c24eb7d6fa0037
Author: Garule Prabhudas 
AuthorDate: Sat Oct 9 14:14:53 2021 +0530

Encryption support ext consumer (#612)

* add ability to encrypt messages
- use base crypto package for encryption

* fix typo

* lint fixes

* address review suggestions

* revert go mod

* remove encryption context
 - move it to Consumer MR

* try to fix check issues

* remove unused code

* consumer encryption/decryption changes

* remove embedded crypto struct

* remove embedded struct

* add comments

* merge conflict issues fix

* add noop decryptor

* lint fixes

* fix test case

* refactor and reader encryption changes

* refactor
- move decryptor creation to partition_producer.go
- update reader_impl
- update consumer_impl

* address review feedback

* Nack on decryption failure

* add comment on test case

Co-authored-by: PGarule 
---
 pulsar/consumer.go |   3 +
 pulsar/consumer_impl.go|   1 +
 pulsar/consumer_partition.go   | 108 ++-
 pulsar/consumer_partition_test.go  |   4 +
 pulsar/consumer_test.go| 847 -
 pulsar/encryption.go   |  12 +
 pulsar/impl_message.go |  23 +
 pulsar/internal/crypto/consumer_decryptor.go   |  60 ++
 .../crypto/decryptor.go}   |  23 +-
 .../crypto/noop_decryptor.go}  |  30 +-
 .../pulsartracing/message_carrier_util_test.go |   4 +
 pulsar/message.go  |   4 +
 pulsar/reader.go   |   3 +
 pulsar/reader_impl.go  |   1 +
 pulsar/reader_test.go  |  56 ++
 15 files changed, 1148 insertions(+), 31 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 1c52b29..c9fbc0d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -155,6 +155,9 @@ type ConsumerOptions struct {
 
// MaxReconnectToBroker set the maximum retry number of 
reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
+
+   // Decryption decryption related fields to decrypt the encrypted message
+   Decryption *MessageDecryptionInfo
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6677062..232079b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -335,6 +335,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
maxReconnectToBroker:   
c.options.MaxReconnectToBroker,
keySharedPolicy:
c.options.KeySharedPolicy,
schema: c.options.Schema,
+   decryption: 
c.options.Decryption,
}
cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 5f74bcf..e691d14 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,8 +26,10 @@ import (
 
"github.com/gogo/protobuf/proto"
 
+   "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+   cryptointernal 
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
 
@@ -98,6 +100,7 @@ type partitionConsumerOpts struct {
maxReconnectToBroker   *uint
keySharedPolicy*KeySharedPolicy
schema Schema
+   decryption *MessageDecryptionInfo
 }
 
 type partitionConsumer struct {
@@ -142,6 +145,7 @@ type partitionConsumer struct {
providersMutex   sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics  *internal.TopicMetrics
+   decryptorcryptointernal.Decryptor
 }
 
 func newPart

[pulsar-client-go] branch master updated: [Issue 615] Fix retry on specified-partition topic (#616)

2021-10-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new e9629fb  [Issue 615] Fix retry on specified-partition topic (#616)
e9629fb is described below

commit e9629fb7851a078c8746940b45134b136c84a0a6
Author: Minzhang 
AuthorDate: Sat Oct 9 16:43:43 2021 +0800

[Issue 615] Fix retry on specified-partition topic (#616)

* fix retry on consume specified-partition topic  (#615)

* introduce variable for topic name

Co-authored-by: xiaolongran 
---
 pulsar/consumer_multitopic.go |  11 +++--
 pulsar/consumer_test.go   | 101 ++
 2 files changed, 109 insertions(+), 3 deletions(-)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index faf8917..f689fae 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -149,11 +149,16 @@ func (c *multiTopicConsumer) ReconsumeLater(msg Message, 
delay time.Duration) {
return
}
 
-   fqdnTopic := internal.TopicNameWithoutPartitionPart(names[0])
+   tn := names[0]
+   fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
consumer, ok := c.consumers[fqdnTopic]
if !ok {
-   c.log.Warnf("consumer of topic %s not exist unexpectedly", 
msg.Topic())
-   return
+   // check to see if the topic with the partition part is in the 
consumers
+   // this can happen when the consumer is configured to consume 
from a specific partition
+   if consumer, ok = c.consumers[tn.Name]; !ok {
+   c.log.Warnf("consumer of topic %s not exist 
unexpectedly", msg.Topic())
+   return
+   }
}
consumer.ReconsumeLater(msg, delay)
 }
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 1811722..66587f9 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1353,6 +1353,107 @@ func TestRLQMultiTopics(t *testing.T) {
assert.Nil(t, checkMsg)
 }
 
+func TestRLQSpecifiedPartitionTopic(t *testing.T) {
+   topic := newTopicName()
+   testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + 
topic + "/partitions"
+   makeHTTPCall(t, http.MethodPut, testURL, "1")
+
+   normalTopic := "persistent://public/default/" + topic
+   partitionTopic := normalTopic + "-partition-0"
+
+   subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+   maxRedeliveries := 2
+   N := 100
+   ctx := context.Background()
+
+   client, err := NewClient(ClientOptions{URL: lookupURL})
+   assert.Nil(t, err)
+   defer client.Close()
+
+   // subscribe topic with partition
+   rlqConsumer, err := client.Subscribe(ConsumerOptions{
+   Topic:   partitionTopic,
+   SubscriptionName:subName,
+   Type:Shared,
+   SubscriptionInitialPosition: SubscriptionPositionEarliest,
+   DLQ: &DLQPolicy{MaxDeliveries: 
uint32(maxRedeliveries)},
+   RetryEnable: true,
+   NackRedeliveryDelay: 1 * time.Second,
+   })
+   assert.Nil(t, err)
+   defer rlqConsumer.Close()
+
+   // subscribe DLQ Topic
+   dlqConsumer, err := client.Subscribe(ConsumerOptions{
+   Topic:   subName + "-DLQ",
+   SubscriptionName:subName,
+   SubscriptionInitialPosition: SubscriptionPositionEarliest,
+   })
+   assert.Nil(t, err)
+   defer dlqConsumer.Close()
+
+   // create producer
+   producer, err := client.CreateProducer(ProducerOptions{Topic: 
normalTopic})
+   assert.Nil(t, err)
+   defer producer.Close()
+
+   // 1. Pre-produce N messages
+   for i := 0; i < N; i++ {
+   _, err = producer.Send(ctx, &ProducerMessage{Payload: 
[]byte(fmt.Sprintf("MSG_01_%d", i))})
+   assert.Nil(t, err)
+   }
+
+   // 2. Create consumer on the Retry Topics to reconsume N messages 
(maxRedeliveries+1) times
+   rlqReceived := 0
+   for rlqReceived < N*(maxRedeliveries+1) {
+   msg, err := rlqConsumer.Receive(ctx)
+   assert.Nil(t, err)
+   rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+   rlqReceived++
+   }
+   fmt.Println("retry consumed:", rlqReceived) // 300
+
+   // No more messages on the Retry Topic
+   rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+   defer rlqCancel()
+   msg, err := rlqConsumer.

[pulsar-client-go] branch master updated (d336ff7 -> 44d6b16)

2021-10-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


from d336ff7  Remove mutitopic- and regexp consumer along with reader from 
client' handlers map when Close called. (#620)
 add 44d6b16  Increment ConsumersOpened counter on each new consumer 
instance. (#619)

No new revisions were added by this update.

Summary of changes:
 pulsar/consumer_impl.go | 12 ++--
 1 file changed, 2 insertions(+), 10 deletions(-)


[pulsar-client-go] branch master updated: Remove mutitopic- and regexp consumer along with reader from client' handlers map when Close called. (#620)

2021-10-09 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new d336ff7  Remove mutitopic- and regexp consumer along with reader from 
client' handlers map when Close called. (#620)
d336ff7 is described below

commit d336ff717c98b30d976dd1ebf9b1acf76e05c695
Author: PowerStateFailure <29687050+powerstatefail...@users.noreply.github.com>
AuthorDate: Sat Oct 9 12:30:57 2021 +0500

Remove mutitopic- and regexp consumer along with reader from client' 
handlers map when Close called. (#620)

This change fixes memory leak when frequently using short-living regexp- or 
multitopic consumers because there were not removed from client handler on Close

Co-authored-by: xiaolongran 
---
 pulsar/consumer_multitopic.go | 4 
 pulsar/consumer_regex.go  | 1 +
 pulsar/reader_impl.go | 3 +++
 3 files changed, 8 insertions(+)

diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index dc4ad7b..faf8917 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -30,6 +30,8 @@ import (
 )
 
 type multiTopicConsumer struct {
+   client *client
+
options ConsumerOptions
 
consumerName string
@@ -48,6 +50,7 @@ type multiTopicConsumer struct {
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics 
[]string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) 
(Consumer, error) {
mtc := &multiTopicConsumer{
+   client:   client,
options:  options,
messageCh:messageCh,
consumers:make(map[string]Consumer, len(topics)),
@@ -186,6 +189,7 @@ func (c *multiTopicConsumer) Close() {
}
wg.Wait()
close(c.closeCh)
+   c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
})
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 9e0c125..2f46c48 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -217,6 +217,7 @@ func (c *regexConsumer) Close() {
}(con)
}
wg.Wait()
+   c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
})
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index a019d9c..9983286 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -33,6 +33,7 @@ const (
 
 type reader struct {
sync.Mutex
+   client  *client
pc  *partitionConsumer
messageCh   chan ConsumerMessage
lastMessageInBroker trackingMessageID
@@ -91,6 +92,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
}
 
reader := &reader{
+   client:client,
messageCh: make(chan ConsumerMessage),
log:   client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
metrics:   client.metrics.GetTopicMetrics(options.Topic),
@@ -174,6 +176,7 @@ func (r *reader) hasMoreMessages() bool {
 
 func (r *reader) Close() {
r.pc.Close()
+   r.client.handlers.Del(r)
r.metrics.ReadersClosed.Inc()
 }
 


[pulsar-client-go] branch master updated: fix TestNamespaceTopicsNamespaceDoesNotExit (#625)

2021-09-28 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new b0f328e  fix TestNamespaceTopicsNamespaceDoesNotExit (#625)
b0f328e is described below

commit b0f328e358d45e822efac164b11dc493bbecfcaa
Author: Rui Fu 
AuthorDate: Tue Sep 28 22:31:38 2021 +0800

fix TestNamespaceTopicsNamespaceDoesNotExit (#625)

fix TestNamespaceTopicsNamespaceDoesNotExit error after pulsar release 2.8.1
ref: apache/pulsar#11172
---
 pulsar/client_impl_test.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index b1d128a..e9e94af 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -383,7 +383,7 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t *testing.T) {
 
// fetch from namespace that does not exist
name := generateRandomName()
-   topics, err := 
ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), 
internal.Persistent)
+   topics, err := 
ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), 
internal.Persistent)
assert.Nil(t, err)
assert.Equal(t, 0, len(topics))
 }
@@ -401,7 +401,7 @@ func TestNamespaceTopicsNamespaceDoesNotExitWebURL(t 
*testing.T) {
 
// fetch from namespace that does not exist
name := generateRandomName()
-   topics, err := 
ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("%s/%s", name, name), 
internal.Persistent)
+   topics, err := 
ci.lookupService.GetTopicsOfNamespace(fmt.Sprintf("public/%s", name), 
internal.Persistent)
assert.NotNil(t, err)
assert.Equal(t, 0, len(topics))
 }


[pulsar] branch master updated (b079c1e -> f784379)

2021-09-01 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from b079c1e  [Schema] Schema compatibility strategy in broker level. 
(#11856)
 add f784379  [pulsar-functions-go] sync to the latest function proto 
(#11853)

No new revisions were added by this update.

Summary of changes:
 pulsar-function-go/pb/Function.pb.go  | 469 +++---
 pulsar-function-go/pb/InstanceCommunication.pb.go |  68 ++--
 pulsar-function-go/pb/Request.pb.go   |   2 +-
 pulsar-function-go/pb/doc.go  |   4 +-
 4 files changed, 271 insertions(+), 272 deletions(-)


[pulsar-client-go] branch master updated: fix: retry assertion (#599)

2021-08-29 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 6837964  fix: retry assertion (#599)
6837964 is described below

commit 6837964a473b29a1fb9ca7a264fdf44fa7d376dc
Author: cimura <35636173+cim...@users.noreply.github.com>
AuthorDate: Mon Aug 30 11:48:42 2021 +0900

fix: retry assertion (#599)

Co-authored-by: kimura 

### Motivation

in `consumer_regex_test.go`, sometimes consumers take more than 2 seconds 
(waiting time) subscribing, and assertions fail.

### Modifications

instead of waiting and asserting just once, repeat it a specified number of 
times so that consumers have enough time to subscribe.
---
 pulsar/consumer_regex_test.go | 27 +++
 pulsar/test_helper.go | 12 
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 228fc2d..9cb600f 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -177,10 +177,11 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
t.Fatal(err)
}
rc.discover()
-   time.Sleep(2000 * time.Millisecond)
-
-   consumers = cloneConsumers(rc)
-   assert.Equal(t, 1, len(consumers))
+   retryAssert(t, 5, 2000, func() {
+   consumers = cloneConsumers(rc)
+   }, func(x assert.TestingT) bool {
+   return assert.Equal(x, 1, len(consumers))
+   })
 }
 
 func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace 
string) {
@@ -216,10 +217,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
defer deleteTopic(myTopic)
 
rc.discover()
-   time.Sleep(2000 * time.Millisecond)
-
-   consumers = cloneConsumers(rc)
-   assert.Equal(t, 0, len(consumers))
+   retryAssert(t, 5, 2000, func() {
+   consumers = cloneConsumers(rc)
+   }, func(x assert.TestingT) bool {
+   return assert.Equal(x, 0, len(consumers))
+   })
 
// create a topic not in the regex
fooTopic := namespace + "/foo-topic"
@@ -229,10 +231,11 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
}
 
rc.discover()
-   time.Sleep(2000 * time.Millisecond)
-
-   consumers = cloneConsumers(rc)
-   assert.Equal(t, 1, len(consumers))
+   retryAssert(t, 5, 2000, func() {
+   consumers = cloneConsumers(rc)
+   }, func(x assert.TestingT) bool {
+   return assert.Equal(x, 1, len(consumers))
+   })
 }
 
 func TestRegexConsumer(t *testing.T) {
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 273169a..d6a4f00 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -30,6 +30,7 @@ import (
"time"
 
"github.com/apache/pulsar-client-go/pulsar/internal"
+   "github.com/stretchr/testify/assert"
 
pkgerrors "github.com/pkg/errors"
 )
@@ -165,3 +166,14 @@ func topicPath(topic string) string {
}
return tn.Name
 }
+
+func retryAssert(t assert.TestingT, times int, milliseconds int, update 
func(), assert func(assert.TestingT) bool) {
+   for i := 0; i < times; i++ {
+   time.Sleep(time.Duration(milliseconds) * time.Millisecond)
+   update()
+   if assert(nil) {
+   break
+   }
+   }
+   assert(t)
+}


[pulsar-client-go] branch master updated: Fix panic when scale down partitions (#601)

2021-08-26 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new b684151  Fix panic when scale down partitions (#601)
b684151 is described below

commit b6841513379ea9ca503d1e350c5f93198fc2b03f
Author: xiaolong ran 
AuthorDate: Thu Aug 26 16:51:20 2021 +0800

Fix panic when scale down partitions (#601)

Signed-off-by: xiaolongran 



### Motivation

When the program is running, if the business is forced to delete certain 
sub partitions, the following error message will be caused, that is, 
old_partitions is greater than new_partitions, it looks like it is doing scale 
down partitions, and the current code logic only deals with the scenario of 
scale up partitions , So if the user is forced to delete some sub partitions, 
the following error will be encountered:

```
level=info msg="[Changed number of partitions in topic]" new_partitions=1 
old_partitions=20 topic="persistent://pulsar-xxx//g"
```

```
panic: runtime error: index out of range [1] with length 1

goroutine 166288 [running]:

github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0,
 0x0, 0x0)
github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 
+0x785

github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0,
 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
   github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
created by 
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
   github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
```

### Modifications

Increase the processing logic of scale down partition
---
 pulsar/consumer_impl.go | 30 ++
 pulsar/producer_impl.go | 27 +++
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ec7ad7d..78ae0d7 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,7 +28,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
-   "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -258,22 +257,16 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
c.Lock()
defer c.Unlock()
+
oldConsumers := c.consumers
+   oldNumPartitions = len(oldConsumers)
 
if oldConsumers != nil {
-   oldNumPartitions = len(oldConsumers)
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not 
changed")
return nil
}
 
-   if oldNumPartitions > newNumPartitions {
-   c.log.WithField("old_partitions", oldNumPartitions).
-   WithField("new_partitions", newNumPartitions).
-   Error("Does not support scaling down operations 
on topic partitions")
-   return errors.New("Does not support scaling down 
operations on topic partitions")
-   }
-
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -281,7 +274,9 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-   if oldConsumers != nil {
+   // When for some reason (eg: forced deletion of sub partition) causes 
oldNumPartitions> newNumPartitions,
+   // we need to rebuild the cache of new consumers, otherwise the array 
will be out of bounds.
+   if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
@@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
 
+   startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
+
+   if partitionsToAdd < 0 {
+   partitionsToAdd = newNumPartitions

[pulsar-client-go] branch master updated: Fix producer panic by oldProducers (#598)

2021-08-24 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new baaf68d  Fix producer panic by oldProducers (#598)
baaf68d is described below

commit baaf68d89bc82ab83b714c0327ea51d58d9bd37f
Author: xiaolong ran 
AuthorDate: Wed Aug 25 14:54:31 2021 +0800

Fix producer panic by oldProducers (#598)

* Fix producer panic by oldProducers

Signed-off-by: xiaolongran 

* fix comments

Signed-off-by: xiaolongran 

* fix comments

Signed-off-by: xiaolongran 

* fix ci error

Signed-off-by: xiaolongran 
---
 pulsar/consumer_impl.go | 16 +---
 pulsar/producer_impl.go | 17 ++---
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b7bc607..ec7ad7d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
+   "github.com/pkg/errors"
 )
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -266,6 +267,13 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
return nil
}
 
+   if oldNumPartitions > newNumPartitions {
+   c.log.WithField("old_partitions", oldNumPartitions).
+   WithField("new_partitions", newNumPartitions).
+   Error("Does not support scaling down operations 
on topic partitions")
+   return errors.New("Does not support scaling down 
operations on topic partitions")
+   }
+
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
@@ -273,9 +281,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
 
c.consumers = make([]*partitionConsumer, newNumPartitions)
 
-   // Copy over the existing consumer instances
-   for i := 0; i < oldNumPartitions; i++ {
-   c.consumers[i] = oldConsumers[i]
+   if oldConsumers != nil {
+   // Copy over the existing consumer instances
+   for i := 0; i < oldNumPartitions; i++ {
+   c.consumers[i] = oldConsumers[i]
+   }
}
 
type ConsumerError struct {
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 1ffd24c..adf9b14 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -26,6 +26,7 @@ import (
 
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
+   "github.com/pkg/errors"
 )
 
 const (
@@ -182,16 +183,26 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
return nil
}
 
+   if oldNumPartitions > newNumPartitions {
+   p.log.WithField("old_partitions", oldNumPartitions).
+   WithField("new_partitions", newNumPartitions).
+   Error("Does not support scaling down operations 
on topic partitions")
+   return errors.New("Does not support scaling down 
operations on topic partitions")
+   }
+
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
+
}
 
p.producers = make([]Producer, newNumPartitions)
 
-   // Copy over the existing consumer instances
-   for i := 0; i < oldNumPartitions; i++ {
-   p.producers[i] = oldProducers[i]
+   if oldProducers != nil {
+   // Copy over the existing consumer instances
+   for i := 0; i < oldNumPartitions; i++ {
+   p.producers[i] = oldProducers[i]
+   }
}
 
type ProducerError struct {


[pulsar] branch master updated (9577b84 -> b8dce10)

2021-08-16 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 9577b84  [Issue 11496][C++] Allow partitioned producers to start 
lazily (#11570)
 add b8dce10  Replace bookkeper with bookkeeper in bin/bookkeeper (#11675)

No new revisions were added by this update.

Summary of changes:
 bin/bookkeeper| 2 +-
 .../java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)


[pulsar] branch master updated (191dc62 -> 720214c)

2021-07-29 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 191dc62  Deep copy the tenants to avoid concurrent sort exception 
(#11463)
 add 720214c  [Go Functions] Upgrade go client version to 0.6.0 (#11477)

No new revisions were added by this update.

Summary of changes:
 pulsar-function-go/go.mod |  5 +++--
 pulsar-function-go/go.sum | 13 +
 pulsar-function-go/pf/mockMessage_test.go | 16 
 3 files changed, 32 insertions(+), 2 deletions(-)


[pulsar-client-go] branch master updated: [issue 574] Fix handle send error panic (#576)

2021-07-27 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 29414db  [issue 574] Fix handle send error panic (#576)
29414db is described below

commit 29414db801a747672e6c0de62d0498d664e70d41
Author: cckellogg 
AuthorDate: Tue Jul 27 23:25:40 2021 -0700

[issue 574] Fix handle send error panic (#576)

Fix handle send error panic
---
 pulsar/internal/connection.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 9873ec8..147de3f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -752,7 +752,7 @@ func (c *connection) handleSendError(cmdError 
*pb.CommandError) {
 
requestID := cmdError.GetRequestId()
 
-   switch *cmdError.Error {
+   switch cmdError.GetError() {
case pb.ServerError_NotAllowedError:
request, ok := c.deletePendingRequest(requestID)
if !ok {


[pulsar-client-go] annotated tag v0.6.0 updated (5e88b01 -> 6d3fa80)

2021-07-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.6.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


*** WARNING: tag v0.6.0 was modified! ***

from 5e88b01  (commit)
  to 6d3fa80  (tag)
 tagging 5e88b015e9aa2c3fd993208c35e47cf1852be07d (commit)
 replaces v0.4.0
  by xiaolongran
  on Mon Jul 26 10:54:43 2021 +0800

- Log -
Release v0.6.0
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmD+I/MPHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjmdOkP/RGinh47MM7iqdTJSM48tQrPjedoCCvg56qM
EraH2YoQnJeZicuOcA+PMr6aeEs7M1gwwv76oBOrSYwK6V6eLbf8Oy8xJk5W/Y/Q
UcyEAD1TBbC3ecd4COmcUsIlNkMcddhblwmoHYH5iSWifxYvw6aJj5Qvbwfp711I
15AR9l6QStOyqDAHfFnRSYCAbIHKrDH33lKsOsBh/RtncWoAvN1RtEl6oa/3iI3L
58+KexNGpT8snSetwW3dX0RdWHtLxuL0Q3O+gLtU6zi/+hn7nDs4u12IXbWZQKEr
btUDhQwvXXTshr2lD7G0m5GJg4ofqq3XUnV27sgokiui6vTuTpuzgj9RApxuL/Qx
nXJ0O44gejdDckJgEHPU1E5W13DQmwKQGa7+pZ9oUjd5j9PKc/VwGIP/N29xw+jP
mh5EC+OAKmMZweMhjdtex0667leuyxAV4aS1W7xzLsdb4iGdBiaheioD/VM65olP
ZUPe8Eh0K0imJTaesVnHEznanzjh7lidUera2VZRq3CcMgqIhIi173eTPDrSl1Nn
fEckeWoxHXB+97JteaexzJahGuLDkg4CGa9StuS4Xl6WLDk8uUsQQkoQPvn4qXOE
y0l056/pR8ZTomx6Z/RikFkHPnF3lSg6NzyFKdq2VlOcXxzoE7zppyvW6W0x8Veo
LTypzQAL
=q65A
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


[pulsar-client-go] branch master updated: Update change log for 0.6.0 release (#570)

2021-07-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 1cd38b3  Update change log for 0.6.0 release (#570)
1cd38b3 is described below

commit 1cd38b3c96661ed2c61fc55ea56c217a016c0f83
Author: xiaolong ran 
AuthorDate: Mon Jul 26 10:52:47 2021 +0800

Update change log for 0.6.0 release (#570)

Signed-off-by: xiaolongran 

Update change log for 0.6.0 release
---
 CHANGELOG.md | 37 +
 1 file changed, 37 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 64b901f..ce91e22 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,43 @@
 
 All notable changes to this project will be documented in this file.
 
+[0.6.0] 2021-07-21
+
+## Feature
+
+* Make PartitionsAutoDiscoveryInterval configurable, see 
[PR-514](https://github.com/apache/pulsar-client-go/pull/514).
+* Always check connection close channell, before attempting to put requests, 
see [PR-521](https://github.com/apache/pulsar-client-go/pull/521).
+* Add `LedgerId,EntryId,BatchIdx,PartitionIdx` func for MessageId interface, 
see [PR-529](https://github.com/apache/pulsar-client-go/pull/529).
+* Add DisableReplication to Producer Message, see 
[PR-543](https://github.com/apache/pulsar-client-go/pull/543).
+* Updating comments to conform to golang comment specification, see 
[PR-532](https://github.com/apache/pulsar-client-go/pull/532).
+* Producer respects Context passed to Send() and SendAsync() when applying 
backpressure, see [PR-534](https://github.com/apache/pulsar-client-go/pull/534).
+* Simplify connection close logic, see 
[PR-559](https://github.com/apache/pulsar-client-go/pull/559).
+* Add open tracing to pulsar go client, see 
[PR-518](https://github.com/apache/pulsar-client-go/pull/518).
+* Update proto file, see 
[PR-562](https://github.com/apache/pulsar-client-go/pull/562).
+* Add send error logic for connection, see 
[PR-566](https://github.com/apache/pulsar-client-go/pull/566).
+* Add license file for depend on libs, see 
[PR-567](https://github.com/apache/pulsar-client-go/pull/567).
+
+## Improve
+
+* Update jwt-go dependency to resolve vulnerabilities, see 
[PR-524](https://github.com/apache/pulsar-client-go/pull/524).
+* Fixed Athenz repository name, see 
[PR-522](https://github.com/apache/pulsar-client-go/pull/522).
+* Fix reader latest position, see 
[PR-525](https://github.com/apache/pulsar-client-go/pull/525).
+* Fix timeout guarantee for RequestOnCnx, see 
[PR-492](https://github.com/apache/pulsar-client-go/pull/492).
+* Fix nil pointer error with GetPartitionedTopicMetadata, see 
[PR-536](https://github.com/apache/pulsar-client-go/pull/536).
+* Release locks before calling producer consumer response callbacks, see 
[PR-542](https://github.com/apache/pulsar-client-go/pull/542).
+* Fix lookup service not implemented GetTopicsOfNamespace, see 
[PR-541](https://github.com/apache/pulsar-client-go/pull/541).
+* Regenerate the certs to work with Pulsar 2.8.0 and Java 11, see 
[PR-548](https://github.com/apache/pulsar-client-go/pull/548).
+* Fix race condition when resend pendingItems, see 
[PR-551](https://github.com/apache/pulsar-client-go/pull/551).
+* Fix data race while accessing connection in partitionConsumer, see 
[PR-535](https://github.com/apache/pulsar-client-go/pull/535).
+* Fix channel data race, see 
[PR-558](https://github.com/apache/pulsar-client-go/pull/558).
+* Fix write to closed channel panic() in internal/connection during connection 
close, see [PR-539](https://github.com/apache/pulsar-client-go/pull/539).
+* Fix possible race condition in connection pool, see 
[PR-561](https://github.com/apache/pulsar-client-go/pull/561).
+* Fix default connection timeout, see 
[PR-563](https://github.com/apache/pulsar-client-go/pull/563).
+* Add lock for compressionProviders to fix data race problem, see 
[PR-533](https://github.com/apache/pulsar-client-go/pull/533).
+* Fix send goroutine blocked, see 
[PR-530](https://github.com/apache/pulsar-client-go/pull/530).
+
+
+
 [0.5.0] 2021-05-14
 
 ## Feature


[pulsar-client-go] branch master updated: update version to 0.6.0 (#572)

2021-07-25 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 5e37108  update version to 0.6.0 (#572)
5e37108 is described below

commit 5e37108cd168e424961dd374afa1ef2dace29f15
Author: xiaolong ran 
AuthorDate: Mon Jul 26 10:52:30 2021 +0800

update version to 0.6.0 (#572)

Signed-off-by: xiaolongran 

update version to 0.6.0
---
 VERSION| 2 +-
 stable.txt | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/VERSION b/VERSION
index 8d1f998..cf25392 100644
--- a/VERSION
+++ b/VERSION
@@ -1,3 +1,3 @@
 // This version number refers to the currently released version number
 // Please fix the version when release.
-v0.5.0
+v0.6.0
diff --git a/stable.txt b/stable.txt
index 3270adc..71c317e 100644
--- a/stable.txt
+++ b/stable.txt
@@ -1,3 +1,3 @@
 // This version number refers to the current stable version, generally is 
`VERSION - 1`.
 // Please fix the version when release.
-v0.5.0
+v0.6.0


[pulsar-client-go] branch master updated: [issue 490] Add error log when schema encode failed. (#571)

2021-07-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new bbee640  [issue 490] Add error log when schema encode failed. (#571)
bbee640 is described below

commit bbee6401ac34ae1d8ca5f08e8990418c69aa52e5
Author: Zhiqiang Li 
AuthorDate: Wed Jul 21 18:41:03 2021 +0800

[issue 490] Add error log when schema encode failed. (#571)

* Add error log when schema encode failed.

* format code
---
 pulsar/producer_partition.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 7e83bfa..abec4fc 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -356,6 +356,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
if p.options.Schema != nil {
schemaPayload, err = p.options.Schema.Encode(msg.Value)
if err != nil {
+   p.log.WithError(err).Errorf("Schema encode message 
failed %s", msg.Value)
return
}
}


[pulsar-client-go] branch master updated: Add producer check state before send msg. (#569)

2021-07-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 1dfb8fd  Add producer check state before send msg. (#569)
1dfb8fd is described below

commit 1dfb8fdffb97d0c4f4845cc668bed485324daf5c
Author: Zhiqiang Li 
AuthorDate: Wed Jul 21 17:43:23 2021 +0800

Add producer check state before send msg. (#569)

Add producer state check before send msg.
---
 pulsar/error.go  |  4 
 pulsar/producer_partition.go |  7 +++
 pulsar/producer_test.go  | 30 ++
 3 files changed, 41 insertions(+)

diff --git a/pulsar/error.go b/pulsar/error.go
index 60a832b..f433bfc 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -99,6 +99,8 @@ const (
AddToBatchFailed
// SeekFailed seek failed
SeekFailed
+   // ProducerClosed means producer already been closed
+   ProducerClosed
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -201,6 +203,8 @@ func getResultStr(r Result) string {
return "AddToBatchFailed"
case SeekFailed:
return "SeekFailed"
+   case ProducerClosed:
+   return "ProducerClosed"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8b3d33d..7e83bfa 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -50,6 +50,7 @@ var (
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue 
is full")
errContextExpired  = newError(TimeoutError, "message send context 
expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds 
MaxMessageSize")
+   errProducerClosed  = newError(ProducerClosed, "producer already been 
closed")
 
buffersPool sync.Pool
 )
@@ -658,6 +659,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, 
msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+   if p.getProducerState() != producerReady {
+   // Producer is closing
+   callback(nil, msg, errProducerClosed)
+   return
+   }
+
sr := &sendRequest{
ctx:  ctx,
msg:  msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3dbd7..bbe8028 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1097,3 +1097,33 @@ func TestProducerWithInterceptors(t *testing.T) {
assert.Equal(t, 10, metric.sendn)
assert.Equal(t, 10, metric.ackn)
 }
+
+func TestProducerSendAfterClose(t *testing.T) {
+   client, err := NewClient(ClientOptions{
+   URL: serviceURL,
+   })
+   assert.NoError(t, err)
+   defer client.Close()
+
+   producer, err := client.CreateProducer(ProducerOptions{
+   Topic: newTopicName(),
+   })
+
+   assert.NoError(t, err)
+   assert.NotNil(t, producer)
+   defer producer.Close()
+
+   ID, err := producer.Send(context.Background(), &ProducerMessage{
+   Payload: []byte("hello"),
+   })
+
+   assert.NoError(t, err)
+   assert.NotNil(t, ID)
+
+   producer.Close()
+   ID, err = producer.Send(context.Background(), &ProducerMessage{
+   Payload: []byte("hello"),
+   })
+   assert.Nil(t, ID)
+   assert.Error(t, err)
+}


[pulsar-client-go] branch branch-0.6.0 created (now 5e88b01)

2021-07-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-0.6.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


  at 5e88b01  Add license file for depend libs (#567)

No new revisions were added by this update.


[pulsar-client-go] annotated tag v0.6.0-candidate-1 updated (5e88b01 -> ad15726)

2021-07-21 Thread rxl
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to annotated tag v0.6.0-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git.


*** WARNING: tag v0.6.0-candidate-1 was modified! ***

from 5e88b01  (commit)
  to ad15726  (tag)
 tagging 5e88b015e9aa2c3fd993208c35e47cf1852be07d (commit)
 replaces v0.4.0
  by xiaolongran
  on Wed Jul 21 14:43:51 2021 +0800

- Log -
Release v0.6.0-candidate-1
-BEGIN PGP SIGNATURE-

iQJDBAABCAAtFiEE4nvv6z2Aqlp0sXEpAHfMO00BOOYFAmD3wicPHHJ4bEBhcGFj
aGUub3JnAAoJEAB3zDtNATjmOaYQAMqI62fJ11dinzy8dQdN27KhpEV2Q/dg3XuB
/eNrMyCQ5xxmb4b0vjVqj8x4OXB/eLKqYq57ntyIsp6G+rJWoP0Ca/ez+hMdwBDj
RFfCrCUAV9vTQZMno+dHFgwJJ+Q6FSlSHiqELAZ0kfS2IpWCHgPz29Xeltl0swLz
7mPEBIj5ORl9bzQ9oEMbAUPJycSv/LfUWUKdj2eQR3JSp96uHgXbY15gDUVTBDAI
sTC3dwD9vduFak0bCnhP3mvqHz7bS4JCSTP9Rvuo0G/j24Iz4xxtoTSGAsUFXReV
DFISSttjTwiXRc9QeQ/ZcY/uADre9QiYJqg26cfjRFYA2G54y39klKLCCCiUqefq
yqxSFdvF2XMkEEqjUkroiVHf/+rYF9XfVaOf4WaJ2xRHoFb0bipBFcE4AXJDhzV1
t/xg+OcNBgoLFr2zNXGVO7fip+ocRTnIkOhW7Uae2arztRJ7fnNr63qRPVqzmbGE
b6UPojT9aufFMZSmxAML67Ct7FeLFAGRNa3dYT1+24lfRvUlcX6ARFIH8iRFiwvI
IrWvgN35xoiEX1mrX//OXmlAd5speuKUa7Z8gZzvLAB4oeZA2zmOJASbV4DO0TcI
ADhX1i8CbGENX+3WlrFqgAmdMMXdbGT5w2Gx37Zu36is7Xez6d96m1WV8Exn6/NI
d80e/H77
=4XD9
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:


svn commit: r48918 - in /dev/pulsar/pulsar-client-go-0.6.0-candidate-1: ./ apache-pulsar-client-go-0.6.0-src.tar.gz apache-pulsar-client-go-0.6.0-src.tar.gz.asc apache-pulsar-client-go-0.6.0-src.tar.g

2021-07-21 Thread rxl
Author: rxl
Date: Wed Jul 21 07:20:40 2021
New Revision: 48918

Log:
Staging artifacts and signature for Pulsar Client Go release 0.6.0-candidate-1

Added:
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/

dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz
   (with props)

dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc

dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512

Added: 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc
==
--- 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.asc
 Wed Jul 21 07:20:40 2021
@@ -0,0 +1,17 @@
+-BEGIN PGP SIGNATURE-
+
+iQJEBAABCAAuFiEEciSp+kqNwHcTrdvU+q+T3I6QoRgFAmD3yK0QHHlvbmdAYXBh
+Y2hlLm9yZwAKCRD6r5PcjpChGHuBD/wN8eTxHrkFYGALUFgx6USucpOl5q0L8gxR
+X7NbOzi3VcXWBrF86AYVtRrUYeRw+nsHX6118PVgoG7HNG7jvVXE8JiqJFSXtCri
+hkl1zAH5FtOF9nh/k59QwH5fkjJ19vTH/k35MSCmV5VFTmSwlaz03tSYSCZECyNn
+IMPZraiJ2DoDEai/de/z7iE6DkYxoi9h7fOnkbRgu/eiqIgVTvvMcuPmgsapRZ2m
+B8wiJIjCcIfYURX5wVBIpaDAbsiwqQC0jAerGw26hxD3Wm+9WTBfnXHwPC71rmQ4
+oRl4Wx5/ztFMFxDuaOb3ECWzKRpH0Hgrbad0dEPqTfdF6ZKlYfEqarGDjbGzayzY
+Ayt1IgUy9Yq+73auR3Ig0MdD8402b3lkzD20UsmCtbAmC4uM71poNCcE4i0WXCE9
+vD3QydyxNA6eXcCPs1fxccBMqLb1wv7jUq/ckbpW8tn4cpQU6Jft3eZvIN2CfsBd
+hGuEgSc4KpH5bHgFgXmxhbRrK7T/TA1JsBSNbvA3mzWDGGNQxWZwZRWLWzrfB/3E
+0xi8wzyjZBRPfdzZQGyBLZT0c3FKI02p1mBlJY1UaqnNT4Q+2gyydoSXJlCXO2Dx
+7UQjPd++njxoCNgm0I856t6lmux0iSNCYsyrO1LKR81zInVlHF50UWh8U1s7+gA1
+qwaG5VdJYQ==
+=7qLp
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512
==
--- 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512
 (added)
+++ 
dev/pulsar/pulsar-client-go-0.6.0-candidate-1/apache-pulsar-client-go-0.6.0-src.tar.gz.sha512
 Wed Jul 21 07:20:40 2021
@@ -0,0 +1 @@
+29e5a91dc77ce1873d93372e9e2b1c1863f66c0babd0a6152adaa3d55f431cff1f894865db6325f6610b4340211256ead5a52d4e7611706a97fae3d9e3b99651
  apache-pulsar-client-go-0.6.0-src.tar.gz




  1   2   3   4   5   6   7   >