This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8e908733 [fix] Return an error when AckCumulative on a
Shared/KeyShared subscription (#1217)
8e908733 is described below
commit 8e9087339d0d004231d3f8fc5f823e561840182b
Author: Zike Yang <[email protected]>
AuthorDate: Tue May 14 17:20:31 2024 +0800
[fix] Return an error when AckCumulative on a Shared/KeyShared subscription
(#1217)
### Motivation
The consumer should return error when AckCumulative on a Shared/KeyShared
subscription
---
pulsar/consumer_partition.go | 14 +++++++++++++-
pulsar/consumer_test.go | 35 +++++++++++++++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index dc01e692..f752afbc 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -20,7 +20,6 @@ package pulsar
import (
"container/list"
"encoding/hex"
- "errors"
"fmt"
"math"
"strings"
@@ -36,6 +35,7 @@ import (
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/bits-and-blooms/bitset"
+ "github.com/pkg/errors"
uAtomic "go.uber.org/atomic"
)
@@ -50,6 +50,10 @@ const (
consumerClosed
)
+var (
+ ErrInvalidAck = errors.New("invalid ack")
+)
+
func (s consumerState) String() string {
switch s {
case consumerInit:
@@ -686,12 +690,20 @@ func (pc *partitionConsumer)
AckIDWithResponseCumulative(msgID MessageID) error
return pc.internalAckIDCumulative(msgID, true)
}
+func (pc *partitionConsumer) isAllowAckCumulative() bool {
+ return pc.options.subscriptionType != Shared &&
pc.options.subscriptionType != KeyShared
+}
+
func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID,
withResponse bool) error {
if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
return errors.New("consumer state is closed")
}
+ if !pc.isAllowAckCumulative() {
+ return errors.Wrap(ErrInvalidAck, "cumulative ack is not
allowed for the Shared/KeyShared subscription type")
+ }
+
// chunk message id will be converted to tracking message id
trackingID := toTrackingMessageID(msgID)
if trackingID == nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4120ba4b..00f48cae 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4392,3 +4392,38 @@ func TestMultiConsumerMemoryLimit(t *testing.T) {
return assert.Equal(t, pc2PrevQueueSize/2,
pc2.currentQueueSize.Load())
})
}
+
+func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+
+ err = consumer.AckIDCumulative(msg.ID())
+ assert.NotNil(t, err)
+ assert.ErrorIs(t, err, ErrInvalidAck)
+}