Copilot commented on code in PR #1500:
URL: https://github.com/apache/pulsar-client-go/pull/1500#discussion_r3271909299


##########
pulsar/consumer_impl.go:
##########
@@ -782,35 +771,29 @@ func (c *consumer) SeekByTime(time time.Time) error {
        return errs
 }
 
-func (c *consumer) findPartitionConsumer(msgID MessageID) (*partitionConsumer, 
error) {
-       c.Lock()
-       defer c.Unlock()
-       return c.unsafeFindPartitionConsumer(msgID)
-}
-
-// NOTE: This method must be called when c.Lock is held
-func (c *consumer) unsafeFindPartitionConsumer(msgID MessageID) 
(*partitionConsumer, error) {
+func findPartitionConsumer(consumers []*partitionConsumer, msgID MessageID) 
(*partitionConsumer, error) {
        partition := int(msgID.PartitionIdx())
-       if partition < 0 || partition >= len(c.consumers) {
-               c.log.Errorf("invalid partition index %d expected a partition 
between [0-%d]",
-                       partition, len(c.consumers))
+       if partition < 0 || partition >= len(consumers) {
                return nil, fmt.Errorf("invalid partition index %d expected a 
partition between [0-%d]",
-                       partition, len(c.consumers))
+                       partition, len(consumers))
+       }
+       return consumers[partition], nil
+}
+
+func (c *consumer) partitionConsumers() []*partitionConsumer {
+       v := c.consumers.Load()
+       if v == nil {
+               return nil
        }
-       return c.consumers[partition], nil
+       consumers := v.([]*partitionConsumer)
+       return append([]*partitionConsumer(nil), consumers...)

Review Comment:
   partitionConsumers() clones the slice on every call, which adds allocations 
and O(n) copying on hot paths (Ack/AckID/Nack/etc.). Since updates are already 
copy-on-write via atomic.Value, consider returning the stored slice directly 
(treating it as immutable, like producer.getProducers()), or providing a 
separate explicit snapshot/copy helper only where needed.
   



##########
pulsar/consumer_impl.go:
##########
@@ -782,35 +771,29 @@ func (c *consumer) SeekByTime(time time.Time) error {
        return errs
 }
 
-func (c *consumer) findPartitionConsumer(msgID MessageID) (*partitionConsumer, 
error) {
-       c.Lock()
-       defer c.Unlock()
-       return c.unsafeFindPartitionConsumer(msgID)
-}
-
-// NOTE: This method must be called when c.Lock is held
-func (c *consumer) unsafeFindPartitionConsumer(msgID MessageID) 
(*partitionConsumer, error) {
+func findPartitionConsumer(consumers []*partitionConsumer, msgID MessageID) 
(*partitionConsumer, error) {
        partition := int(msgID.PartitionIdx())
-       if partition < 0 || partition >= len(c.consumers) {
-               c.log.Errorf("invalid partition index %d expected a partition 
between [0-%d]",
-                       partition, len(c.consumers))
+       if partition < 0 || partition >= len(consumers) {
                return nil, fmt.Errorf("invalid partition index %d expected a 
partition between [0-%d]",
-                       partition, len(c.consumers))
+                       partition, len(consumers))
+       }
+       return consumers[partition], nil

Review Comment:
   The error message range is off by one: valid partition indices are 
0..len(consumers)-1, but the message prints "[0-%d]" with len(consumers). 
Adjust the upper bound to len(consumers)-1 (and consider including the actual 
len for clarity).



##########
pulsar/consumer_test.go:
##########
@@ -5927,3 +5929,380 @@ func TestSelectConnectionForSameConsumer(t *testing.T) {
                        "The consumer uses a different connection when 
reconnecting")
        }
 }
+
+func 
TestInternalTopicSubscribeToPartitionsDoesNotBlockExistingPartitionLookup(t 
*testing.T) {
+       lookupURL, err := url.Parse("pulsar://localhost:6650")
+       require.NoError(t, err)
+
+       allowSubscribe := make(chan struct{})
+       subscribeStarted := make(chan struct{})
+       var releaseSubscribe sync.Once
+
+       logger := slog.New(slog.NewJSONHandler(os.Stdout, 
&slog.HandlerOptions{Level: slog.LevelInfo}))
+       slog.SetDefault(logger)

Review Comment:
   This test sets a process-wide default slog logger via slog.SetDefault, which 
can affect logging behavior of other tests running in the same package/process 
and make failures harder to interpret. Prefer using a local logger instance 
(and passing it where needed) or restoring the previous default in a defer.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to