+1

Thanks,
Zike

On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> wrote:
>
> Hi
>  The PIP link : https://github.com/apache/pulsar/issues/13989
>
> Regards
> Jiwei Guo (Tboy)
>
>
> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com>
> wrote:
>
> > Hello everyone,
> >
> > I want to start a discussion about PIP-139 : Support Broker send command
> > to real close producer/consumer.
> >
> > This is the PIP document
> >
> > https://github.com/apache/pulsar/issues/13989 <
> > https://github.com/apache/pulsar/issues/13979>
> >
> > Please check it out and feel free to share your thoughts.
> >
> > Best,
> > Mattison
> >
> >
> > ———————— Pasted below for quoting convenience.
> >
> >
> >
> > Relation pull request:  #13337
> > Authors: @Technoboy-  @mattisonchao
> >
> > ## Motivation
> >
> > Before we discuss this pip, I'd like to supplement some context to help
> > contributors who don't want to read the original pull request.
> >
> > > When there are no user-created topics under a namespace, Namespace
> > should be deleted. But currently, the system topic existed and the
> > reader/producer could auto-create the system which may cause the namespace
> > deletion to fail.
> >
> > For this reason, we need to close the system topic reader/producer first,
> > then remove the system topic. finally, remove the namespace.
> >
> > Following this way, we first want to use ``terminate`` to solve this
> > problem. then we found producers can disconnect, but consumers are still
> > alive. So, another PR #13960 wants to add consumers' closing logic.
> >
> > After #13960, all things look good, but another problem appears. that is
> > we need to wait until consumers completely consume all messages (this may
> > make terminate topic so long and the operation timeout)then get
> > ``reachedEndOfTopic``. the relative code here :
> >
> >
> > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
> >
> > In the #13337 case, we need to force close consumers immediately. So we
> > write this PIP to discuss another way to resolve this problem.
> >
> > ## Goal
> >
> > We can add a new field(`allow_reconnect`) in command
> > ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
> > producer/consumers immediately.
> >
> > ## API Changes
> >
> > - Add ``allow_reconnect`` to ``CommandCloseProducer``;
> >
> > ```java
> > **Before**
> >
> > message CommandCloseProducer {
> >     required uint64 producer_id = 1;
> >     required uint64 request_id = 2;
> > }
> >
> > **After**
> > message CommandCloseProducer {
> >     required uint64 producer_id = 1;
> >     required uint64 request_id = 2;
> >     optional bool allow_reconnect = 3 [default = true];
> > }
> > ```
> >
> > - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> > ```java
> > **Before**
> >
> > message CommandCloseConsumer {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id = 2;
> > }
> >
> > **After**
> > message CommandCloseConsumer {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id = 2;
> >     optional bool allow_reconnect = 3 [default = true];
> > }
> > ```
> >
> > ## Implementation
> >
> > ### ClientCnx - Producer:
> >
> > **Before**
> > ```java
> >     @Override
> >     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> > {
> >         log.info("[{}] Broker notification of Closed producer: {}",
> > remoteAddress, closeProducer.getProducerId());
> >         final long producerId = closeProducer.getProducerId();
> >         ProducerImpl<?> producer = producers.get(producerId);
> >         if (producer != null) {
> >             producer.connectionClosed(this);
> >         } else {
> >             log.warn("Producer with id {} not found while closing producer
> > ", producerId);
> >         }
> >     }
> > ```
> > After:
> > ```java
> >    @Override
> >     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> > {
> >         log.info("[{}] Broker notification of Closed producer: {}",
> > remoteAddress, closeProducer.getProducerId());
> >         final long producerId = closeProducer.getProducerId();
> >         ProducerImpl<?> producer = producers.get(producerId);
> >         if (producer != null) {
> >             if (closeProducer.isAllowReconnect) {
> >                 producer.connectionClosed(this);
> >             } else {
> >                 producer.closeAsync();
> >             }
> >         } else {
> >             log.warn("Producer with id {} not found while closing producer
> > ", producerId);
> >         }
> >     }
> > ```
> > ### ClientCnx - Consumer:
> >
> > **Before**
> > ```java
> >     @Override
> >     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> > {
> >         log.info("[{}] Broker notification of Closed consumer: {}",
> > remoteAddress, closeConsumer.getConsumerId());
> >         final long consumerId = closeConsumer.getConsumerId();
> >         ConsumerImpl<?> consumer = consumers.get(consumerId);
> >         if (consumer != null) {
> >             consumer.connectionClosed(this);
> >         } else {
> >             log.warn("Consumer with id {} not found while closing consumer
> > ", consumerId);
> >         }
> >     }
> > ```
> > **After**
> > ```java
> >     @Override
> >     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> > {
> >         log.info("[{}] Broker notification of Closed consumer: {}",
> > remoteAddress, closeConsumer.getConsumerId());
> >         final long consumerId = closeConsumer.getConsumerId();
> >         ConsumerImpl<?> consumer = consumers.get(consumerId);
> >         if (consumer != null) {
> >             if (closeConsumer.isAllowReconnect) {
> >                 consumer.connectionClosed(this);
> >             } else {
> >                 consumer.closeAsync();
> >             }
> >         } else {
> >             log.warn("Consumer with id {} not found while closing consumer
> > ", consumerId);
> >         }
> >     }
> > ```
> >
> >
> > ## Reject Alternatives
> >
> > none.



-- 
Zike Yang

Reply via email to