massakam opened a new issue, #1330:
URL: https://github.com/apache/pulsar-client-go/issues/1330
#### Expected behavior
omitted
#### Actual behavior
Zero queue consumer has been supported since v0.13.0, but there seem to be
some buggy behaviors.
- Immediately after creating a consumer, I got the following error log:
```
ERRO[0000] unable to send initial permits to broker consumerID=1
error="invalid number of permits requested: 0" name=dufbb subscription=sub1
topic="persistent://pulsar/test/t1"
```
- If I registered `MessageChannel` when creating a consumer, its
`availablePermits` was 0. Naturally, even if messages were published to the
topic, it was not able to receive any of them.
- Consumer that receive messages using `Receive()` rather than
`MessageChannel` worked. However, if the connected topic was unloaded or the
broker was restarted, `availablePermits` became 0 and no messages could be
received thereafter.
#### Steps to reproduce
I ran the following code:
consumer_with_message_channel.go
```go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
channel := make(chan pulsar.ConsumerMessage)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://pulsar/test/t1",
SubscriptionName: "sub1",
ReceiverQueueSize: 0,
EnableZeroQueueConsumer: true,
MessageChannel: channel,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for out := range channel {
msg := out.Message
fmt.Printf("Received: %s (%s)\n", string(msg.Payload()),
msg.ID().String())
consumer.Ack(msg)
}
}
```
consumer_without_message_channel.go
```go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://pulsar/test/t1",
SubscriptionName: "sub1",
ReceiverQueueSize: 0,
EnableZeroQueueConsumer: true,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 100; i++ {
msg, err := consumer.Receive(context.Background())
if err == nil {
fmt.Printf("Received: %s (%s)\n", string(msg.Payload()),
msg.ID().String())
consumer.Ack(msg)
} else {
log.Fatal(err)
}
}
}
```
#### System configuration
**Pulsar version**: v0.14.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]