RobertIndie commented on code in PR #1323:
URL: https://github.com/apache/pulsar-client-go/pull/1323#discussion_r1899512032
##########
pulsar/consumer_test.go:
##########
@@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
assert.True(t, strings.HasSuffix(publisher.ClientVersion,
"-test-client"))
}
+
+func TestSelectConnectionForSameConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MaxConnectionsPerBroker: 10,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ })
+ assert.NoError(t, err)
+ defer _consumer.Close()
+
+ partitionConsumerImp := _consumer.(*consumer).consumers[0]
Review Comment:
```suggestion
partitionConsumerImpl := _consumer.(*consumer).consumers[0]
```
##########
pulsar/internal/connection_pool.go:
##########
@@ -84,9 +87,11 @@ func NewConnectionPool(
return p
}
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL) (Connection, error) {
- p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr",
physicalAddr).Debug("Getting pooled connection")
- key := p.getMapKey(logicalAddr, physicalAddr)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr
*url.URL,
+ keySuffix int32) (Connection, error) {
+ p.log.WithField("logicalAddr", logicalAddr).
+ WithField("physicalAddr", physicalAddr).Debug("Getting pooled
connection")
Review Comment:
```suggestion
p.log.WithField("logicalAddr", logicalAddr).
WithField("physicalAddr", physicalAddr).
WithField("keySuffix", keySuffix).Debug("Getting pooled
connection")
```
Better to add a new field `keySuffix` to help debug.
##########
pulsar/internal/connection_pool.go:
##########
@@ -141,6 +146,14 @@ func (p *connectionPool) GetConnections()
map[string]Connection {
return conns
}
+func (p *connectionPool) GenerateRoundRobinIndex() int32 {
+ cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
+ if cnt < 0 {
+ cnt = -cnt
+ }
+ return cnt % p.maxConnectionsPerHost
+}
Review Comment:
```suggestion
func (p *connectionPool) GenerateRoundRobinIndex() int32 {
return atomic.AddUint32(&p.roundRobinCnt, 1) % p.maxConnectionsPerHost
}
```
`cnt = -cnt` is incorrect. I think you're trying to handle the situation
where `uint32` overflow results in a negative number. However, if `cnt`
overflows to `math.MinInt32`, negating it will still result in a negative
number.
Could you make `roundRobinCnt` as uint32, so that we can simplify this
method.
##########
pulsar/consumer_test.go:
##########
@@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
assert.True(t, strings.HasSuffix(publisher.ClientVersion,
"-test-client"))
}
+
+func TestSelectConnectionForSameConsumer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ MaxConnectionsPerBroker: 10,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+
+ _consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Type: Shared,
+ })
+ assert.NoError(t, err)
+ defer _consumer.Close()
+
+ partitionConsumerImp := _consumer.(*consumer).consumers[0]
+ conn := partitionConsumerImp._getConn()
+
+ for i := 0; i < 5; i++ {
+ partitionConsumerImp.grabConn("")
Review Comment:
```suggestion
assert.NoError(t, partitionConsumerImp.grabConn(""))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]