danielchang-Z opened a new issue, #1426:
URL: https://github.com/apache/pulsar-client-go/issues/1426

   #### Expected behavior
   
   When an error occurs and the partition consumer is closed, subsequent 
retries or partition expansion logic should recreate the closed partition 
consumer to ensure no partitions are permanently skipped.
   
   #### Actual behavior
   
   Once a partition consumer is closed due to an error, it is never recreated. 
This results in missed consumption on that partition.
   
   #### Steps to reproduce
   
   1.   Create a consumer on a partitioned topic.
   2.   Trigger an error at 
[consumer_impl.go#L418](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_impl.go#L418).
   3.   Observe that the partition consumer is closed.
   4.   Wait for retry or trigger partition expansion.
   5.   Verify that the closed consumer is not recreated, and messages on that 
partition are no longer consumed.
   
   #### System configuration
   **Pulsar version**: 0.14.0, 0.15.1, 0.16.0
   
   #### Relevant Code Snippets with Bug Notes
   
   - 
[consumer_impl.go#L309-L333](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_impl.go#L309-L333)
 ingores the error from method `internalTopicSubscribeToPartitions` and calls 
it on ticker
   ```go
   func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) 
(cancel func()) {
        var wg sync.WaitGroup
        stopDiscoveryCh := make(chan struct{})
        ticker := time.NewTicker(period)
   
        wg.Add(1)
        go func() {
                defer wg.Done()
                for {
                        select {
                        case <-stopDiscoveryCh:
                                return
                        case <-ticker.C:
                                c.log.Debug("Auto discovering new partitions")
                                c.internalTopicSubscribeToPartitions()   // 
Uncaught error
                        }
                }
        }()
   
        return func() {
                ticker.Stop()
                close(stopDiscoveryCh)
                wg.Wait()
        }
   }
   ```
   
   - 
[consumer_impl.go#L418-L427](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_impl.go#L418-L427)
 closes all partition consumer when encounting error
   ```go
   if err != nil {
        // Since there were some failures,
        // cleanup all the partitions that succeeded in creating the consumer
        for _, c := range c.consumers {
                if c != nil {
                        c.Close()
                }
        }
        return err
   }
   ```
   
   - 
[consumer_impl.go#L351-L354](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_impl.go#L351-L354)
 returns fastly when the number of partitions has not changed even though the 
old consumers are closed.
   ```go
   if oldNumPartitions == newNumPartitions {
        c.log.Debug("Number of partitions in topic has not changed")
        return nil
   }
   ```
   
   - 
[consumer_impl.go#L363-L370](https://github.com/apache/pulsar-client-go/blob/v0.16.0/pulsar/consumer_impl.go#L363-L370)
 reuses the existing closed consumer instances.
   ```go
   // When for some reason (eg: forced deletion of sub partition) causes 
oldNumPartitions> newNumPartitions,
   // we need to rebuild the cache of new consumers, otherwise the array will 
be out of bounds.
   if oldConsumers != nil && oldNumPartitions < newNumPartitions {
        // Copy over the existing consumer instances
        for i := 0; i < oldNumPartitions; i++ {
                c.consumers[i] = oldConsumers[i]
        }
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to