> This is supposed to mean that the namespace should be able to be
> deleted, correct?

Yes, the main background is the user doesn’t have an active topic. so, they 
want to delete the namespace.

> However, I think
> we might still have a race condition that could make tenant or
> namespace deletion fail. Specifically, if a new producer or consumer
> creates a topic after the namespace deletion has started but
> before it is complete. Do you agree that the underlying race still exists?

Yes, this condition exists. I think it’s not a big problem because the user 
doesn’t want to use this namespace anymore. 
If this scenario appears, they will get an error and need to delete it again.

> What if we expand our usage of the "terminated" feature to apply to
> namespaces (and tenants)? Then, a terminated namespace can have
> bundles and topics can be deleted but not created (just as a terminated
> topic cannot have any new messages published to it). This would take
> care of all topic creation race conditions. We'd probably need to add
> new protobuf exceptions for this feature.


If we want to solve this problem, we need to add some sync resources like 
lock/state, I think it’s a harm for us, we don’t need to do that.

Thanks for your suggestions, let me know what you think.

Best,
Mattison

> On Feb 1, 2022, at 2:26 PM, Michael Marshall <mmarsh...@apache.org> wrote:
> 
> This proposal identifies an important issue that we should definitely
> solve. I have some questions.
> 
>> When there are no user-created topics under a namespace,
>> Namespace should be deleted.
> 
> This is supposed to mean that the namespace should be able to be
> deleted, correct?
> 
>> For this reason, we need to close the system topic reader/producer
>> first, then remove the system topic. finally, remove the namespace.
> 
> I agree that expanding the protobuf CloseProducer and CloseConsumer
> commands could be valuable here. The expansion would ensure that
> producers and consumers don't attempt to reconnect. However, I think
> we might still have a race condition that could make tenant or
> namespace deletion fail. Specifically, if a new producer or consumer
> creates a topic after the namespace deletion has started but
> before it is complete. Do you agree that the underlying race still exists?
> 
> In my view, the fundamental problem here is that deleting certain Pulsar
> resources takes time and, in a distributed system, that means race
> conditions.
> 
> What if we expand our usage of the "terminated" feature to apply to
> namespaces (and tenants)? Then, a terminated namespace can have
> bundles and topics can be deleted but not created (just as a terminated
> topic cannot have any new messages published to it). This would take
> care of all topic creation race conditions. We'd probably need to add
> new protobuf exceptions for this feature.
> 
> Thanks,
> Michael
> 
> 
> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
> <zky...@streamnative.io.invalid> wrote:
>> 
>> +1
>> 
>> 
>> Thanks,
>> Zike
>> 
>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> wrote:
>>> 
>>> Hi
>>> The PIP link : https://github.com/apache/pulsar/issues/13989
>>> 
>>> Regards
>>> Jiwei Guo (Tboy)
>>> 
>>> 
>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com>
>>> wrote:
>>> 
>>>> Hello everyone,
>>>> 
>>>> I want to start a discussion about PIP-139 : Support Broker send command
>>>> to real close producer/consumer.
>>>> 
>>>> This is the PIP document
>>>> 
>>>> https://github.com/apache/pulsar/issues/13989 <
>>>> https://github.com/apache/pulsar/issues/13979>
>>>> 
>>>> Please check it out and feel free to share your thoughts.
>>>> 
>>>> Best,
>>>> Mattison
>>>> 
>>>> 
>>>> ———————— Pasted below for quoting convenience.
>>>> 
>>>> 
>>>> 
>>>> Relation pull request:  #13337
>>>> Authors: @Technoboy-  @mattisonchao
>>>> 
>>>> ## Motivation
>>>> 
>>>> Before we discuss this pip, I'd like to supplement some context to help
>>>> contributors who don't want to read the original pull request.
>>>> 
>>>>> When there are no user-created topics under a namespace, Namespace
>>>> should be deleted. But currently, the system topic existed and the
>>>> reader/producer could auto-create the system which may cause the namespace
>>>> deletion to fail.
>>>> 
>>>> For this reason, we need to close the system topic reader/producer first,
>>>> then remove the system topic. finally, remove the namespace.
>>>> 
>>>> Following this way, we first want to use ``terminate`` to solve this
>>>> problem. then we found producers can disconnect, but consumers are still
>>>> alive. So, another PR #13960 wants to add consumers' closing logic.
>>>> 
>>>> After #13960, all things look good, but another problem appears. that is
>>>> we need to wait until consumers completely consume all messages (this may
>>>> make terminate topic so long and the operation timeout)then get
>>>> ``reachedEndOfTopic``. the relative code here :
>>>> 
>>>> 
>>>> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
>>>> 
>>>> In the #13337 case, we need to force close consumers immediately. So we
>>>> write this PIP to discuss another way to resolve this problem.
>>>> 
>>>> ## Goal
>>>> 
>>>> We can add a new field(`allow_reconnect`) in command
>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
>>>> producer/consumers immediately.
>>>> 
>>>> ## API Changes
>>>> 
>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
>>>> 
>>>> ```java
>>>> **Before**
>>>> 
>>>> message CommandCloseProducer {
>>>>    required uint64 producer_id = 1;
>>>>    required uint64 request_id = 2;
>>>> }
>>>> 
>>>> **After**
>>>> message CommandCloseProducer {
>>>>    required uint64 producer_id = 1;
>>>>    required uint64 request_id = 2;
>>>>    optional bool allow_reconnect = 3 [default = true];
>>>> }
>>>> ```
>>>> 
>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
>>>> ```java
>>>> **Before**
>>>> 
>>>> message CommandCloseConsumer {
>>>>    required uint64 consumer_id = 1;
>>>>    required uint64 request_id = 2;
>>>> }
>>>> 
>>>> **After**
>>>> message CommandCloseConsumer {
>>>>    required uint64 consumer_id = 1;
>>>>    required uint64 request_id = 2;
>>>>    optional bool allow_reconnect = 3 [default = true];
>>>> }
>>>> ```
>>>> 
>>>> ## Implementation
>>>> 
>>>> ### ClientCnx - Producer:
>>>> 
>>>> **Before**
>>>> ```java
>>>>    @Override
>>>>    protected void handleCloseProducer(CommandCloseProducer closeProducer)
>>>> {
>>>>        log.info("[{}] Broker notification of Closed producer: {}",
>>>> remoteAddress, closeProducer.getProducerId());
>>>>        final long producerId = closeProducer.getProducerId();
>>>>        ProducerImpl<?> producer = producers.get(producerId);
>>>>        if (producer != null) {
>>>>            producer.connectionClosed(this);
>>>>        } else {
>>>>            log.warn("Producer with id {} not found while closing producer
>>>> ", producerId);
>>>>        }
>>>>    }
>>>> ```
>>>> After:
>>>> ```java
>>>>   @Override
>>>>    protected void handleCloseProducer(CommandCloseProducer closeProducer)
>>>> {
>>>>        log.info("[{}] Broker notification of Closed producer: {}",
>>>> remoteAddress, closeProducer.getProducerId());
>>>>        final long producerId = closeProducer.getProducerId();
>>>>        ProducerImpl<?> producer = producers.get(producerId);
>>>>        if (producer != null) {
>>>>            if (closeProducer.isAllowReconnect) {
>>>>                producer.connectionClosed(this);
>>>>            } else {
>>>>                producer.closeAsync();
>>>>            }
>>>>        } else {
>>>>            log.warn("Producer with id {} not found while closing producer
>>>> ", producerId);
>>>>        }
>>>>    }
>>>> ```
>>>> ### ClientCnx - Consumer:
>>>> 
>>>> **Before**
>>>> ```java
>>>>    @Override
>>>>    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
>>>> {
>>>>        log.info("[{}] Broker notification of Closed consumer: {}",
>>>> remoteAddress, closeConsumer.getConsumerId());
>>>>        final long consumerId = closeConsumer.getConsumerId();
>>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
>>>>        if (consumer != null) {
>>>>            consumer.connectionClosed(this);
>>>>        } else {
>>>>            log.warn("Consumer with id {} not found while closing consumer
>>>> ", consumerId);
>>>>        }
>>>>    }
>>>> ```
>>>> **After**
>>>> ```java
>>>>    @Override
>>>>    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
>>>> {
>>>>        log.info("[{}] Broker notification of Closed consumer: {}",
>>>> remoteAddress, closeConsumer.getConsumerId());
>>>>        final long consumerId = closeConsumer.getConsumerId();
>>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
>>>>        if (consumer != null) {
>>>>            if (closeConsumer.isAllowReconnect) {
>>>>                consumer.connectionClosed(this);
>>>>            } else {
>>>>                consumer.closeAsync();
>>>>            }
>>>>        } else {
>>>>            log.warn("Consumer with id {} not found while closing consumer
>>>> ", consumerId);
>>>>        }
>>>>    }
>>>> ```
>>>> 
>>>> 
>>>> ## Reject Alternatives
>>>> 
>>>> none.
>> 
>> 
>> 
>> --
>> Zike Yang

Reply via email to