bluecucumber1989-commits opened a new issue, #1461:
URL: https://github.com/apache/pulsar-client-go/issues/1461

   #### Expected behavior
   
   no message loss
   
   #### Actual behavior
   
   During the Consumer reconnection process, if two consecutive 
reconnectToBroker calls occur, it leads to message loss. Specifically: messages 
received from Broker during the first reconnection (e.g., M1, M2) are cleared 
during the second reconnection, and users ultimately only see subsequent 
messages (e.g., M3, M4).
   
   #### Steps to reproduce
   
   Prerequisites:
   1. Consumer uses Durable subscription mode (default mode)
   2. Messages have not been acked yet
   3. Broker owner changed
   
   Trigger Conditions:
   1. Broker sends CommandCloseConsumer to close the Consumer
   2. Before the first reconnection completes, TCP connection breaks triggering 
a second reconnection
   
   _**Maybe Race Condition in Duplicate Reconnection**_: 
   `handleCloseConsumer` calls `ConnectionClosed` before deleting the handler. 
If the TCP connection breaks before deletion, the `Close` method will call 
`ConnectionClosed` again
   
   ```
   // First call: Broker actively closes
   func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
       consumerID := closeConsumer.GetConsumerId()
       c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
   
       if consumer, ok := c.consumerHandler(consumerID); ok {
           consumer.ConnectionClosed(closeConsumer)  // First call
           c.DeleteConsumeHandler(consumerID)        // Delete handler 
afterwards
       }
   }
   
   // Second call: TCP connection breaks
   func (c *connection) Close() {
       c.closeOnce.Do(func() {
           listeners, consumerHandlers, cnx := c.closeAndEmptyObservers()
           
           // If DeleteConsumeHandler hasn't executed, snapshot still contains 
the consumer
           for _, handler := range consumerHandlers {
               handler.ConnectionClosed(nil)  // Second call
           }
       })
   } 
   ```
   
   _**Then, Key Problem Points Maybe:**_ 
   The second reconection clears previous local queue: `pc.clearReceiverQueue` 
clears dispatcher.messages, causing M1, M2 to be cleared during the second 
reconnection ?
   ```
   // pulsar/consumer_partition.go
   func (pc *partitionConsumer) grabConn() error {
       // ...
       if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
           pc.startMessageID.set(seekMsgID)
           pc.seekMessageID.set(nil)
       } else {
           pc.startMessageID.set(pc.clearReceiverQueue())
       }
   
       // In Durable mode, the StartMessageId is not sent to the broker
       if pc.options.subscriptionMode != Durable {
           cmdSubscribe.StartMessageId = 
convertToMessageIDData(pc.startMessageID.get())
       }
       // ...
   } 
   ```
   
   
   #### System configuration
   Pulsar version: 4.1
   pulsar-client-go: 0.18.0
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to