geniusjoe opened a new issue, #1421: URL: https://github.com/apache/pulsar-client-go/issues/1421
**Is your feature request related to a problem? Please describe.** Main pr https://github.com/apache/pulsar-client-go/pull/1225 When we create a consumer with ZeroQueueConsumer option enabled, to subscribe a topic's partition as follows, It will throw an exception: `ZeroQueueConsumer is not supported for partitioned topics` ```Go consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "public/default/test1-partition-0", SubscriptionName: "sub-test", Type: pulsar.Shared, EnableZeroQueueConsumer: true }) ``` But in Java sdk, we can both subscribe a non-partitioned topic or a specific topic's partition, because it only check [partition number](https://github.com/apache/pulsar/blob/415c6fa74e908bad2da1e7f986185c6ef17cb9a1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L610) regardless of topic type: ```Java src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#doSingleTopicSubscribeAsync if (metadata.partitions > 0) { consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, externalExecutorProvider, consumerSubscribedFuture, metadata.partitions, schema, interceptors); } else { int partitionIndex = TopicName.getPartitionIndex(topic); consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, externalExecutorProvider, partitionIndex, false, consumerSubscribedFuture, null, schema, interceptors, true /* createTopicIfDoesNotExist */); } src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#newConsumerImpl if (conf.getReceiverQueueSize() == 0) { return new ZeroQueueConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist); } else { return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, parentConsumerHasListener, subscribeFuture, startMessageId, startMessageRollbackDurationInSec /* rollback time in sec to start msgId */, schema, interceptors, createTopicIfDoesNotExist); } ``` The main reason we cannot subscribe more than one partitions in zero queue was discussed in this [issue](https://github.com/apache/pulsar/issues/7280#issuecomment-791400497): > Since we don't know if a topic will there be any messages written, to avoid a consumer block to one topic(no more data are written) It means when we subscribing multiple partitions at the same time, we cannot find which partition currently has messages and consumer need to send permit to. And we can directly send permit to a non-partitioned topic or a topic's partition. So that maybe we can keep consistent with Java sdk and support topic's partition as well. **Describe the solution you'd like** The key concept of zero-queue is: only when we call consumer.Receive() function, the sdk will increase permit and call `flowIfNeed()` to fetch message from broker. So that there is no messages in the pending queue and we don't need to call `pc.internalFlow(initialPermits)` when connection first established in `consumer_partition.go#<-pc.connectedCh`. I think maybe we need modify zero_queue as below: 1. Remove one partition check in `pulsar/consumer_impl.go#newInternalConsumer` ``` //if len(partitions) == 1 && options.EnableZeroQueueConsumer && // strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) { // return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics") //} ``` 2. Create partitioned_consumer with exact topic partition in `pulsar/consumer_zero_queue.go#newZeroConsumer` ``` tn, err := internal.ParseTopicName(topic) if err != nil { return nil, err } opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, tn.Partition, zc.options) ``` **Additional context** I think maybe there is one corner case we need to discuss: In pr https://github.com/apache/pulsar-client-go/blob/8fb4613c2fc9ffd5119a6067770e71a7be807bcb/pulsar/consumer_partition.go#L1924 consumer will trigger a `pc.availablePermits.inc()` in `pulsar/consumer_partition.go#reconnectToBroker` and fetch a message from broker: ``` if err == nil { // Successfully reconnected pc.log.Info("Reconnected consumer to broker") bo.Reset() if pc.options.enableZeroQueueConsumer { pc.log.Info("zeroQueueConsumer reconnect, reset availablePermits") pc.availablePermits.inc() } return struct{}{}, nil } ``` It's okay when a consumer call Receive() before topic unload and then get message, because currently consumer send two permits and only one permit in reconnect work. But I'm not sure what will happen if the consumer call Receive() after topic unload. Will this two permits both work and fetch two messages at the same time, and do we need to refer to [Java sdk](https://github.com/apache/pulsar/blob/415c6fa74e908bad2da1e7f986185c6ef17cb9a1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L122) and save consumer status? ``` // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (this) { // if message received due to an old flow - discard it and wait for the message from the // latest flow command if (msgCnx == cnx()) { waitingOnReceiveForZeroQueueSize = false; break; } } } while (true); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
