+1

发自我的iPhone

> 在 2022年1月29日,12:24,PengHui Li <peng...@apache.org> 写道:
> 
> +1
> 
> Penghui
> 
>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com>
>> wrote:
>> 
>> Hello everyone,
>> 
>> I want to start a discussion about PIP-139 : Support Broker send command
>> to real close producer/consumer.
>> 
>> This is the PIP document
>> 
>> https://github.com/apache/pulsar/issues/13989 <
>> https://github.com/apache/pulsar/issues/13979>
>> 
>> Please check it out and feel free to share your thoughts.
>> 
>> Best,
>> Mattison
>> 
>> 
>> ———————— Pasted below for quoting convenience.
>> 
>> 
>> 
>> Relation pull request:  #13337
>> Authors: @Technoboy-  @mattisonchao
>> 
>> ## Motivation
>> 
>> Before we discuss this pip, I'd like to supplement some context to help
>> contributors who don't want to read the original pull request.
>> 
>>> When there are no user-created topics under a namespace, Namespace
>> should be deleted. But currently, the system topic existed and the
>> reader/producer could auto-create the system which may cause the namespace
>> deletion to fail.
>> 
>> For this reason, we need to close the system topic reader/producer first,
>> then remove the system topic. finally, remove the namespace.
>> 
>> Following this way, we first want to use ``terminate`` to solve this
>> problem. then we found producers can disconnect, but consumers are still
>> alive. So, another PR #13960 wants to add consumers' closing logic.
>> 
>> After #13960, all things look good, but another problem appears. that is
>> we need to wait until consumers completely consume all messages (this may
>> make terminate topic so long and the operation timeout)then get
>> ``reachedEndOfTopic``. the relative code here :
>> 
>> 
>> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
>> 
>> In the #13337 case, we need to force close consumers immediately. So we
>> write this PIP to discuss another way to resolve this problem.
>> 
>> ## Goal
>> 
>> We can add a new field(`allow_reconnect`) in command
>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
>> producer/consumers immediately.
>> 
>> ## API Changes
>> 
>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
>> 
>> ```java
>> **Before**
>> 
>> message CommandCloseProducer {
>>    required uint64 producer_id = 1;
>>    required uint64 request_id = 2;
>> }
>> 
>> **After**
>> message CommandCloseProducer {
>>    required uint64 producer_id = 1;
>>    required uint64 request_id = 2;
>>    optional bool allow_reconnect = 3 [default = true];
>> }
>> ```
>> 
>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
>> ```java
>> **Before**
>> 
>> message CommandCloseConsumer {
>>    required uint64 consumer_id = 1;
>>    required uint64 request_id = 2;
>> }
>> 
>> **After**
>> message CommandCloseConsumer {
>>    required uint64 consumer_id = 1;
>>    required uint64 request_id = 2;
>>    optional bool allow_reconnect = 3 [default = true];
>> }
>> ```
>> 
>> ## Implementation
>> 
>> ### ClientCnx - Producer:
>> 
>> **Before**
>> ```java
>>    @Override
>>    protected void handleCloseProducer(CommandCloseProducer closeProducer)
>> {
>>        log.info("[{}] Broker notification of Closed producer: {}",
>> remoteAddress, closeProducer.getProducerId());
>>        final long producerId = closeProducer.getProducerId();
>>        ProducerImpl<?> producer = producers.get(producerId);
>>        if (producer != null) {
>>            producer.connectionClosed(this);
>>        } else {
>>            log.warn("Producer with id {} not found while closing producer
>> ", producerId);
>>        }
>>    }
>> ```
>> After:
>> ```java
>>   @Override
>>    protected void handleCloseProducer(CommandCloseProducer closeProducer)
>> {
>>        log.info("[{}] Broker notification of Closed producer: {}",
>> remoteAddress, closeProducer.getProducerId());
>>        final long producerId = closeProducer.getProducerId();
>>        ProducerImpl<?> producer = producers.get(producerId);
>>        if (producer != null) {
>>            if (closeProducer.isAllowReconnect) {
>>                producer.connectionClosed(this);
>>            } else {
>>                producer.closeAsync();
>>            }
>>        } else {
>>            log.warn("Producer with id {} not found while closing producer
>> ", producerId);
>>        }
>>    }
>> ```
>> ### ClientCnx - Consumer:
>> 
>> **Before**
>> ```java
>>    @Override
>>    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
>> {
>>        log.info("[{}] Broker notification of Closed consumer: {}",
>> remoteAddress, closeConsumer.getConsumerId());
>>        final long consumerId = closeConsumer.getConsumerId();
>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
>>        if (consumer != null) {
>>            consumer.connectionClosed(this);
>>        } else {
>>            log.warn("Consumer with id {} not found while closing consumer
>> ", consumerId);
>>        }
>>    }
>> ```
>> **After**
>> ```java
>>    @Override
>>    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
>> {
>>        log.info("[{}] Broker notification of Closed consumer: {}",
>> remoteAddress, closeConsumer.getConsumerId());
>>        final long consumerId = closeConsumer.getConsumerId();
>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
>>        if (consumer != null) {
>>            if (closeConsumer.isAllowReconnect) {
>>                consumer.connectionClosed(this);
>>            } else {
>>                consumer.closeAsync();
>>            }
>>        } else {
>>            log.warn("Consumer with id {} not found while closing consumer
>> ", consumerId);
>>        }
>>    }
>> ```
>> 
>> 
>> ## Reject Alternatives
>> 
>> none.

Reply via email to