gkhnoztrk commented on issue #1276:
URL: 
https://github.com/apache/pulsar-client-go/issues/1276#issuecomment-2319748533

   
![Animation](https://github.com/user-attachments/assets/1f4fef57-5a44-4d5f-adf8-7e2a9056933a)
   
   The expected behavior with EnableZeroQueueConsumer set to true should be as 
follows:
   
   50 tasks were placed in the queue.
   Each consumer was limited to processing 5 tasks concurrently.
   4 consumers were opened sequentially.
   The distribution should have been:
   Consumer 1: Tasks 1-5
   Consumer 2: Tasks 6-10
   Consumer 3: Tasks 11-15
   Consumer 4: Tasks 16-20
   However, the actual behavior observed was:
   
   Consumer 1: Processed tasks 1-5, but incorrectly buffered tasks 6-20
   Consumer 2: Processed tasks 21-25, but incorrectly buffered tasks 26-40
   (Similar pattern for other consumers)
   When Consumer 1 was closed, it released all its tasks, including those in 
the buffer, which Consumer 4 then took over.
   
   This behavior contradicts the purpose of EnableZeroQueueConsumer. With this 
setting enabled, consumers should only fetch tasks as they're ready to process 
them, without pre-fetching or buffering additional tasks. The current 
implementation appears to be ignoring this setting, leading to unnecessary 
buffering and potential issues with task distribution and processing.
   
   test code : 
   
   ```
   func consumer3() {
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL:    "pulsar://localhost:6650",
                Logger: plog.NewLoggerWithLogrus(&logrus.Logger{}),
        })
        if err != nil {
                log.Fatal(err)
        }
        defer client.Close()
   
        schema := pulsar.NewAvroSchema(common.AvroSchema, nil)
   
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:                       "test1",
                SubscriptionName:            "my-shared-subscription",
                Type:                        pulsar.Shared,
                Schema:                      schema,
                SubscriptionInitialPosition: 
pulsar.SubscriptionPositionEarliest,
                EnableZeroQueueConsumer:     true,
                MaxPendingChunkedMessage:    1,
        })
   
        if err != nil {
                log.Fatal(err)
        }
        defer consumer.Close()
   
        accountWorker := common.NewAccountListener(5)
   
        fmt.Println("waiting..")
        for accountWorker.WaitReady() {
                msg, err := consumer.Receive(context.Background())
                if err != nil {
                        log.Printf("Error receiving message: %v", err)
                        continue
                }
                account := &common.Account{}
                err = msg.GetSchemaValue(account)
                if err != nil {
                        log.Printf("Error decoding message: %v", err)
                        consumer.Nack(msg)
                        continue
                }
                fmt.Println("Consumed : " + strconv.Itoa(account.ID))
                accountWorker.AddAccount(consumer, msg, account)
        }
   } 
   ```
   
   ```
   const AvroSchema = `
   {
     "type": "record",
     "name": "Account",
     "fields": [
       {"name": "ID", "type": "int"}
     ]
   }`
   
   type AccountWrapper struct {
        Account  *Account
        Consumer pulsar.Consumer
        Message  pulsar.Message
   }
   
   type Account struct {
        ID int `avro:"ID"`
   }
   
   type AccountListener struct {
        Limit    int
        Accounts map[int]*AccountWrapper
        lock     *sync.Mutex
   }
   
   func NewAccountListener(limit int) *AccountListener {
        return &AccountListener{
                Limit:    limit,
                Accounts: make(map[int]*AccountWrapper),
                lock:     &sync.Mutex{},
        }
   }
   
   func (l *AccountListener) WaitReady() bool {
        for l.checkLimit() == false {
                time.Sleep(time.Millisecond * 100)
        }
        return true
   }
   
   func (l *AccountListener) IsReady() bool {
        l.lock.Lock()
        defer l.lock.Unlock()
        return len(l.Accounts) < l.Limit
   }
   
   func (l *AccountListener) checkLimit() bool {
        l.lock.Lock()
        defer l.lock.Unlock()
        return len(l.Accounts) < l.Limit
   }
   
   func (l *AccountListener) AddAccount(consumer pulsar.Consumer, message 
pulsar.Message, account *Account) {
        l.lock.Lock()
        defer l.lock.Unlock()
        l.Accounts[account.ID] = &AccountWrapper{
                Account:  account,
                Message:  message,
                Consumer: consumer,
        }
        go l.Process(l.Accounts[account.ID])
   }
   
   func (l *AccountListener) RemoveAccount(account *Account) {
        l.lock.Lock()
        defer l.lock.Unlock()
        delete(l.Accounts, account.ID)
   }
   
   func (l *AccountListener) Process(wrapper *AccountWrapper) {
        // long process simulate
        time.Sleep(time.Minute * 5)
        // finish
        fmt.Println("< : " + strconv.Itoa(wrapper.Account.ID))
        wrapper.Consumer.Ack(wrapper.Message) // success - remove queue
        l.RemoveAccount(wrapper.Account)
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to