Hi
 The PIP link : https://github.com/apache/pulsar/issues/13989

Regards
Jiwei Guo (Tboy)


On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com>
wrote:

> Hello everyone,
>
> I want to start a discussion about PIP-139 : Support Broker send command
> to real close producer/consumer.
>
> This is the PIP document
>
> https://github.com/apache/pulsar/issues/13989 <
> https://github.com/apache/pulsar/issues/13979>
>
> Please check it out and feel free to share your thoughts.
>
> Best,
> Mattison
>
>
> ———————— Pasted below for quoting convenience.
>
>
>
> Relation pull request:  #13337
> Authors: @Technoboy-  @mattisonchao
>
> ## Motivation
>
> Before we discuss this pip, I'd like to supplement some context to help
> contributors who don't want to read the original pull request.
>
> > When there are no user-created topics under a namespace, Namespace
> should be deleted. But currently, the system topic existed and the
> reader/producer could auto-create the system which may cause the namespace
> deletion to fail.
>
> For this reason, we need to close the system topic reader/producer first,
> then remove the system topic. finally, remove the namespace.
>
> Following this way, we first want to use ``terminate`` to solve this
> problem. then we found producers can disconnect, but consumers are still
> alive. So, another PR #13960 wants to add consumers' closing logic.
>
> After #13960, all things look good, but another problem appears. that is
> we need to wait until consumers completely consume all messages (this may
> make terminate topic so long and the operation timeout)then get
> ``reachedEndOfTopic``. the relative code here :
>
>
> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
>
> In the #13337 case, we need to force close consumers immediately. So we
> write this PIP to discuss another way to resolve this problem.
>
> ## Goal
>
> We can add a new field(`allow_reconnect`) in command
> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
> producer/consumers immediately.
>
> ## API Changes
>
> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
>
> ```java
> **Before**
>
> message CommandCloseProducer {
>     required uint64 producer_id = 1;
>     required uint64 request_id = 2;
> }
>
> **After**
> message CommandCloseProducer {
>     required uint64 producer_id = 1;
>     required uint64 request_id = 2;
>     optional bool allow_reconnect = 3 [default = true];
> }
> ```
>
> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> ```java
> **Before**
>
> message CommandCloseConsumer {
>     required uint64 consumer_id = 1;
>     required uint64 request_id = 2;
> }
>
> **After**
> message CommandCloseConsumer {
>     required uint64 consumer_id = 1;
>     required uint64 request_id = 2;
>     optional bool allow_reconnect = 3 [default = true];
> }
> ```
>
> ## Implementation
>
> ### ClientCnx - Producer:
>
> **Before**
> ```java
>     @Override
>     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> {
>         log.info("[{}] Broker notification of Closed producer: {}",
> remoteAddress, closeProducer.getProducerId());
>         final long producerId = closeProducer.getProducerId();
>         ProducerImpl<?> producer = producers.get(producerId);
>         if (producer != null) {
>             producer.connectionClosed(this);
>         } else {
>             log.warn("Producer with id {} not found while closing producer
> ", producerId);
>         }
>     }
> ```
> After:
> ```java
>    @Override
>     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> {
>         log.info("[{}] Broker notification of Closed producer: {}",
> remoteAddress, closeProducer.getProducerId());
>         final long producerId = closeProducer.getProducerId();
>         ProducerImpl<?> producer = producers.get(producerId);
>         if (producer != null) {
>             if (closeProducer.isAllowReconnect) {
>                 producer.connectionClosed(this);
>             } else {
>                 producer.closeAsync();
>             }
>         } else {
>             log.warn("Producer with id {} not found while closing producer
> ", producerId);
>         }
>     }
> ```
> ### ClientCnx - Consumer:
>
> **Before**
> ```java
>     @Override
>     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> {
>         log.info("[{}] Broker notification of Closed consumer: {}",
> remoteAddress, closeConsumer.getConsumerId());
>         final long consumerId = closeConsumer.getConsumerId();
>         ConsumerImpl<?> consumer = consumers.get(consumerId);
>         if (consumer != null) {
>             consumer.connectionClosed(this);
>         } else {
>             log.warn("Consumer with id {} not found while closing consumer
> ", consumerId);
>         }
>     }
> ```
> **After**
> ```java
>     @Override
>     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> {
>         log.info("[{}] Broker notification of Closed consumer: {}",
> remoteAddress, closeConsumer.getConsumerId());
>         final long consumerId = closeConsumer.getConsumerId();
>         ConsumerImpl<?> consumer = consumers.get(consumerId);
>         if (consumer != null) {
>             if (closeConsumer.isAllowReconnect) {
>                 consumer.connectionClosed(this);
>             } else {
>                 consumer.closeAsync();
>             }
>         } else {
>             log.warn("Consumer with id {} not found while closing consumer
> ", consumerId);
>         }
>     }
> ```
>
>
> ## Reject Alternatives
>
> none.

Reply via email to