geniusjoe opened a new issue, #1421:
URL: https://github.com/apache/pulsar-client-go/issues/1421

   **Is your feature request related to a problem? Please describe.**
   Main pr https://github.com/apache/pulsar-client-go/pull/1225 
   When we create a consumer with ZeroQueueConsumer option enabled, to 
subscribe a topic's partition as follows, It will throw an exception: 
`ZeroQueueConsumer is not supported for partitioned topics`
   ```Go
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic: "public/default/test1-partition-0",
                SubscriptionName: "sub-test",
                Type:             pulsar.Shared,
                EnableZeroQueueConsumer: true
        })
   ```
   
   But in Java sdk, we can both subscribe a non-partitioned topic or a specific 
topic's partition, because it only check [partition 
number](https://github.com/apache/pulsar/blob/415c6fa74e908bad2da1e7f986185c6ef17cb9a1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L610)
 regardless of topic type:
   ```Java
   
src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#doSingleTopicSubscribeAsync
               if (metadata.partitions > 0) {
                   consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
                           externalExecutorProvider, consumerSubscribedFuture, 
metadata.partitions, schema, interceptors);
               } else {
                   int partitionIndex = TopicName.getPartitionIndex(topic);
                   consumer = 
ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, 
externalExecutorProvider,
                           partitionIndex, false, consumerSubscribedFuture, 
null, schema, interceptors,
                           true /* createTopicIfDoesNotExist */);
               }
   
   src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#newConsumerImpl
           if (conf.getReceiverQueueSize() == 0) {
               return new ZeroQueueConsumerImpl<>(client, topic, conf, 
executorProvider, partitionIndex, hasParentConsumer,
                       subscribeFuture,
                       startMessageId, schema, interceptors,
                       createTopicIfDoesNotExist);
           } else {
               return new ConsumerImpl<>(client, topic, conf, executorProvider, 
partitionIndex, hasParentConsumer,
                       parentConsumerHasListener,
                       subscribeFuture, startMessageId,
                       startMessageRollbackDurationInSec /* rollback time in 
sec to start msgId */,
                       schema, interceptors, createTopicIfDoesNotExist);
           }
   ```
   The main reason we cannot subscribe more than one partitions in zero queue 
was discussed in this 
[issue](https://github.com/apache/pulsar/issues/7280#issuecomment-791400497):
   > Since we don't know if a topic will there be any messages written, to 
avoid a consumer block to one topic(no more data are written)
   
   It means when we subscribing multiple partitions at the same time, we cannot 
find which partition currently has messages and consumer need to send permit 
to. And we can directly send permit to a non-partitioned topic or a topic's 
partition. So that maybe we can keep consistent with Java sdk and support 
topic's partition as well.
   
   **Describe the solution you'd like**
   The key concept of zero-queue is: only when we call consumer.Receive() 
function, the sdk will increase permit and call `flowIfNeed()` to fetch message 
from broker. So that there is no messages in the pending queue and we don't 
need to call `pc.internalFlow(initialPermits)` when connection first 
established in `consumer_partition.go#<-pc.connectedCh`. 
   
   I think maybe we need modify zero_queue as below:
   1. Remove one partition check in 
`pulsar/consumer_impl.go#newInternalConsumer`
   ```
        //if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
        //      strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
        //      return nil, pkgerrors.New("ZeroQueueConsumer is not supported 
for partitioned topics")
        //}
   ```
   2. Create partitioned_consumer with exact topic partition in 
`pulsar/consumer_zero_queue.go#newZeroConsumer`
   ```
        tn, err := internal.ParseTopicName(topic)
        if err != nil {
                return nil, err
        }
        opts := newPartitionConsumerOpts(zc.topic, zc.consumerName, 
tn.Partition, zc.options) 
   ```
   
   **Additional context**
   I think maybe there is one corner case we need to discuss:
   In pr 
https://github.com/apache/pulsar-client-go/blob/8fb4613c2fc9ffd5119a6067770e71a7be807bcb/pulsar/consumer_partition.go#L1924
 consumer will trigger a `pc.availablePermits.inc()` in 
`pulsar/consumer_partition.go#reconnectToBroker` and fetch a message from 
broker:
   ```
                if err == nil {
                        // Successfully reconnected
                        pc.log.Info("Reconnected consumer to broker")
                        bo.Reset()
                        if pc.options.enableZeroQueueConsumer {
                                pc.log.Info("zeroQueueConsumer reconnect, reset 
availablePermits")
                                pc.availablePermits.inc()
                        }
                        return struct{}{}, nil
                }
   ```
   It's okay when a consumer call Receive() before topic unload and then get 
message, because currently consumer send two permits and only one permit in 
reconnect work. But I'm not sure what will happen if the consumer call 
Receive() after topic unload. Will this two permits both work and fetch two 
messages at the same time, and do we need to refer to [Java 
sdk](https://github.com/apache/pulsar/blob/415c6fa74e908bad2da1e7f986185c6ef17cb9a1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L122)
 and save consumer status?
   ```             
                   // synchronized need to prevent race between 
connectionOpened and the check "msgCnx == cnx()"
                   synchronized (this) {
                       // if message received due to an old flow - discard it 
and wait for the message from the
                       // latest flow command
                       if (msgCnx == cnx()) {
                           waitingOnReceiveForZeroQueueSize = false;
                           break;
                       }
                   }
               } while (true);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to