This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e908733 [fix] Return an error when AckCumulative on a 
Shared/KeyShared subscription (#1217)
8e908733 is described below

commit 8e9087339d0d004231d3f8fc5f823e561840182b
Author: Zike Yang <[email protected]>
AuthorDate: Tue May 14 17:20:31 2024 +0800

    [fix] Return an error when AckCumulative on a Shared/KeyShared subscription 
(#1217)
    
    ### Motivation
    
    The consumer should return error when AckCumulative on a Shared/KeyShared 
subscription
---
 pulsar/consumer_partition.go | 14 +++++++++++++-
 pulsar/consumer_test.go      | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index dc01e692..f752afbc 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -20,7 +20,6 @@ package pulsar
 import (
        "container/list"
        "encoding/hex"
-       "errors"
        "fmt"
        "math"
        "strings"
@@ -36,6 +35,7 @@ import (
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/bits-and-blooms/bitset"
+       "github.com/pkg/errors"
 
        uAtomic "go.uber.org/atomic"
 )
@@ -50,6 +50,10 @@ const (
        consumerClosed
 )
 
+var (
+       ErrInvalidAck = errors.New("invalid ack")
+)
+
 func (s consumerState) String() string {
        switch s {
        case consumerInit:
@@ -686,12 +690,20 @@ func (pc *partitionConsumer) 
AckIDWithResponseCumulative(msgID MessageID) error
        return pc.internalAckIDCumulative(msgID, true)
 }
 
+func (pc *partitionConsumer) isAllowAckCumulative() bool {
+       return pc.options.subscriptionType != Shared && 
pc.options.subscriptionType != KeyShared
+}
+
 func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, 
withResponse bool) error {
        if state := pc.getConsumerState(); state == consumerClosed || state == 
consumerClosing {
                pc.log.WithField("state", state).Error("Failed to ack by 
closing or closed consumer")
                return errors.New("consumer state is closed")
        }
 
+       if !pc.isAllowAckCumulative() {
+               return errors.Wrap(ErrInvalidAck, "cumulative ack is not 
allowed for the Shared/KeyShared subscription type")
+       }
+
        // chunk message id will be converted to tracking message id
        trackingID := toTrackingMessageID(msgID)
        if trackingID == nil {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 4120ba4b..00f48cae 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4392,3 +4392,38 @@ func TestMultiConsumerMemoryLimit(t *testing.T) {
                return assert.Equal(t, pc2PrevQueueSize/2, 
pc2.currentQueueSize.Load())
        })
 }
+
+func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("hello"),
+       })
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+
+       err = consumer.AckIDCumulative(msg.ID())
+       assert.NotNil(t, err)
+       assert.ErrorIs(t, err, ErrInvalidAck)
+}

Reply via email to