+1 发自我的iPhone
> 在 2022年1月29日,12:24,PengHui Li <peng...@apache.org> 写道: > > +1 > > Penghui > >> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com> >> wrote: >> >> Hello everyone, >> >> I want to start a discussion about PIP-139 : Support Broker send command >> to real close producer/consumer. >> >> This is the PIP document >> >> https://github.com/apache/pulsar/issues/13989 < >> https://github.com/apache/pulsar/issues/13979> >> >> Please check it out and feel free to share your thoughts. >> >> Best, >> Mattison >> >> >> ———————— Pasted below for quoting convenience. >> >> >> >> Relation pull request: #13337 >> Authors: @Technoboy- @mattisonchao >> >> ## Motivation >> >> Before we discuss this pip, I'd like to supplement some context to help >> contributors who don't want to read the original pull request. >> >>> When there are no user-created topics under a namespace, Namespace >> should be deleted. But currently, the system topic existed and the >> reader/producer could auto-create the system which may cause the namespace >> deletion to fail. >> >> For this reason, we need to close the system topic reader/producer first, >> then remove the system topic. finally, remove the namespace. >> >> Following this way, we first want to use ``terminate`` to solve this >> problem. then we found producers can disconnect, but consumers are still >> alive. So, another PR #13960 wants to add consumers' closing logic. >> >> After #13960, all things look good, but another problem appears. that is >> we need to wait until consumers completely consume all messages (this may >> make terminate topic so long and the operation timeout)then get >> ``reachedEndOfTopic``. the relative code here : >> >> >> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 >> >> In the #13337 case, we need to force close consumers immediately. So we >> write this PIP to discuss another way to resolve this problem. >> >> ## Goal >> >> We can add a new field(`allow_reconnect`) in command >> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close >> producer/consumers immediately. >> >> ## API Changes >> >> - Add ``allow_reconnect`` to ``CommandCloseProducer``; >> >> ```java >> **Before** >> >> message CommandCloseProducer { >> required uint64 producer_id = 1; >> required uint64 request_id = 2; >> } >> >> **After** >> message CommandCloseProducer { >> required uint64 producer_id = 1; >> required uint64 request_id = 2; >> optional bool allow_reconnect = 3 [default = true]; >> } >> ``` >> >> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` >> ```java >> **Before** >> >> message CommandCloseConsumer { >> required uint64 consumer_id = 1; >> required uint64 request_id = 2; >> } >> >> **After** >> message CommandCloseConsumer { >> required uint64 consumer_id = 1; >> required uint64 request_id = 2; >> optional bool allow_reconnect = 3 [default = true]; >> } >> ``` >> >> ## Implementation >> >> ### ClientCnx - Producer: >> >> **Before** >> ```java >> @Override >> protected void handleCloseProducer(CommandCloseProducer closeProducer) >> { >> log.info("[{}] Broker notification of Closed producer: {}", >> remoteAddress, closeProducer.getProducerId()); >> final long producerId = closeProducer.getProducerId(); >> ProducerImpl<?> producer = producers.get(producerId); >> if (producer != null) { >> producer.connectionClosed(this); >> } else { >> log.warn("Producer with id {} not found while closing producer >> ", producerId); >> } >> } >> ``` >> After: >> ```java >> @Override >> protected void handleCloseProducer(CommandCloseProducer closeProducer) >> { >> log.info("[{}] Broker notification of Closed producer: {}", >> remoteAddress, closeProducer.getProducerId()); >> final long producerId = closeProducer.getProducerId(); >> ProducerImpl<?> producer = producers.get(producerId); >> if (producer != null) { >> if (closeProducer.isAllowReconnect) { >> producer.connectionClosed(this); >> } else { >> producer.closeAsync(); >> } >> } else { >> log.warn("Producer with id {} not found while closing producer >> ", producerId); >> } >> } >> ``` >> ### ClientCnx - Consumer: >> >> **Before** >> ```java >> @Override >> protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) >> { >> log.info("[{}] Broker notification of Closed consumer: {}", >> remoteAddress, closeConsumer.getConsumerId()); >> final long consumerId = closeConsumer.getConsumerId(); >> ConsumerImpl<?> consumer = consumers.get(consumerId); >> if (consumer != null) { >> consumer.connectionClosed(this); >> } else { >> log.warn("Consumer with id {} not found while closing consumer >> ", consumerId); >> } >> } >> ``` >> **After** >> ```java >> @Override >> protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) >> { >> log.info("[{}] Broker notification of Closed consumer: {}", >> remoteAddress, closeConsumer.getConsumerId()); >> final long consumerId = closeConsumer.getConsumerId(); >> ConsumerImpl<?> consumer = consumers.get(consumerId); >> if (consumer != null) { >> if (closeConsumer.isAllowReconnect) { >> consumer.connectionClosed(this); >> } else { >> consumer.closeAsync(); >> } >> } else { >> log.warn("Consumer with id {} not found while closing consumer >> ", consumerId); >> } >> } >> ``` >> >> >> ## Reject Alternatives >> >> none.