gkhnoztrk commented on issue #1276: URL: https://github.com/apache/pulsar-client-go/issues/1276#issuecomment-2319748533
 The expected behavior with EnableZeroQueueConsumer set to true should be as follows: 50 tasks were placed in the queue. Each consumer was limited to processing 5 tasks concurrently. 4 consumers were opened sequentially. The distribution should have been: Consumer 1: Tasks 1-5 Consumer 2: Tasks 6-10 Consumer 3: Tasks 11-15 Consumer 4: Tasks 16-20 However, the actual behavior observed was: Consumer 1: Processed tasks 1-5, but incorrectly buffered tasks 6-20 Consumer 2: Processed tasks 21-25, but incorrectly buffered tasks 26-40 (Similar pattern for other consumers) When Consumer 1 was closed, it released all its tasks, including those in the buffer, which Consumer 4 then took over. This behavior contradicts the purpose of EnableZeroQueueConsumer. With this setting enabled, consumers should only fetch tasks as they're ready to process them, without pre-fetching or buffering additional tasks. The current implementation appears to be ignoring this setting, leading to unnecessary buffering and potential issues with task distribution and processing. test code : ``` func consumer3() { client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", Logger: plog.NewLoggerWithLogrus(&logrus.Logger{}), }) if err != nil { log.Fatal(err) } defer client.Close() schema := pulsar.NewAvroSchema(common.AvroSchema, nil) consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "test1", SubscriptionName: "my-shared-subscription", Type: pulsar.Shared, Schema: schema, SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, EnableZeroQueueConsumer: true, MaxPendingChunkedMessage: 1, }) if err != nil { log.Fatal(err) } defer consumer.Close() accountWorker := common.NewAccountListener(5) fmt.Println("waiting..") for accountWorker.WaitReady() { msg, err := consumer.Receive(context.Background()) if err != nil { log.Printf("Error receiving message: %v", err) continue } account := &common.Account{} err = msg.GetSchemaValue(account) if err != nil { log.Printf("Error decoding message: %v", err) consumer.Nack(msg) continue } fmt.Println("Consumed : " + strconv.Itoa(account.ID)) accountWorker.AddAccount(consumer, msg, account) } } ``` ``` const AvroSchema = ` { "type": "record", "name": "Account", "fields": [ {"name": "ID", "type": "int"} ] }` type AccountWrapper struct { Account *Account Consumer pulsar.Consumer Message pulsar.Message } type Account struct { ID int `avro:"ID"` } type AccountListener struct { Limit int Accounts map[int]*AccountWrapper lock *sync.Mutex } func NewAccountListener(limit int) *AccountListener { return &AccountListener{ Limit: limit, Accounts: make(map[int]*AccountWrapper), lock: &sync.Mutex{}, } } func (l *AccountListener) WaitReady() bool { for l.checkLimit() == false { time.Sleep(time.Millisecond * 100) } return true } func (l *AccountListener) IsReady() bool { l.lock.Lock() defer l.lock.Unlock() return len(l.Accounts) < l.Limit } func (l *AccountListener) checkLimit() bool { l.lock.Lock() defer l.lock.Unlock() return len(l.Accounts) < l.Limit } func (l *AccountListener) AddAccount(consumer pulsar.Consumer, message pulsar.Message, account *Account) { l.lock.Lock() defer l.lock.Unlock() l.Accounts[account.ID] = &AccountWrapper{ Account: account, Message: message, Consumer: consumer, } go l.Process(l.Accounts[account.ID]) } func (l *AccountListener) RemoveAccount(account *Account) { l.lock.Lock() defer l.lock.Unlock() delete(l.Accounts, account.ID) } func (l *AccountListener) Process(wrapper *AccountWrapper) { // long process simulate time.Sleep(time.Minute * 5) // finish fmt.Println("< : " + strconv.Itoa(wrapper.Account.ID)) wrapper.Consumer.Ack(wrapper.Message) // success - remove queue l.RemoveAccount(wrapper.Account) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
