> The old producer/consumer should be closed after applying the changes from
> this proposal.

Penghui, are you suggesting that we implement the namespace/tenant
terminated logic after completing this PIP?

For the sake of discussion, if we implement the namespace terminated
logic first, we could fulfill the underlying requirements for this PIP
by returning a new non-retriable error response when a client tries to
connect a producer or a consumer to a topic in a namespace that is
"terminated". If we do add the "namespace terminated" feature, we'll
need to add a non-retriable exception for this case, anyway. The main
advantage here is that we'd only need one expansion of the protobuf
instead of two. The downside is that the protocol for connected
clients has a couple more roundtrips. The broker would disconnect
connected clients and then fail their reconnection attempt with a
non-retriable error.

Thanks,
Michael

On Thu, Feb 24, 2022 at 7:11 AM PengHui Li <peng...@apache.org> wrote:
>
> > If we want to solve this problem, we need to add some sync resources like
> lock/state, I think it’s a harm for us, we don’t need to do that.
>
> I think we can make the namespace/tenants to the inactive state first so
> that we can avoid any new
> producer/consumer connect to the topic under the namespace/tenant.
>
> The old producer/consumer should be closed after applying the changes from
> this proposal.
>
> Thanks,
> Penghui
>
> On Tue, Feb 8, 2022 at 5:47 PM mattison chao <mattisonc...@gmail.com> wrote:
>
> > > This is supposed to mean that the namespace should be able to be
> > > deleted, correct?
> >
> > Yes, the main background is the user doesn’t have an active topic. so,
> > they want to delete the namespace.
> >
> > > However, I think
> > > we might still have a race condition that could make tenant or
> > > namespace deletion fail. Specifically, if a new producer or consumer
> > > creates a topic after the namespace deletion has started but
> > > before it is complete. Do you agree that the underlying race still
> > exists?
> >
> > Yes, this condition exists. I think it’s not a big problem because the
> > user doesn’t want to use this namespace anymore.
> > If this scenario appears, they will get an error and need to delete it
> > again.
> >
> > > What if we expand our usage of the "terminated" feature to apply to
> > > namespaces (and tenants)? Then, a terminated namespace can have
> > > bundles and topics can be deleted but not created (just as a terminated
> > > topic cannot have any new messages published to it). This would take
> > > care of all topic creation race conditions. We'd probably need to add
> > > new protobuf exceptions for this feature.
> >
> >
> > If we want to solve this problem, we need to add some sync resources like
> > lock/state, I think it’s a harm for us, we don’t need to do that.
> >
> > Thanks for your suggestions, let me know what you think.
> >
> > Best,
> > Mattison
> >
> > > On Feb 1, 2022, at 2:26 PM, Michael Marshall <mmarsh...@apache.org>
> > wrote:
> > >
> > > This proposal identifies an important issue that we should definitely
> > > solve. I have some questions.
> > >
> > >> When there are no user-created topics under a namespace,
> > >> Namespace should be deleted.
> > >
> > > This is supposed to mean that the namespace should be able to be
> > > deleted, correct?
> > >
> > >> For this reason, we need to close the system topic reader/producer
> > >> first, then remove the system topic. finally, remove the namespace.
> > >
> > > I agree that expanding the protobuf CloseProducer and CloseConsumer
> > > commands could be valuable here. The expansion would ensure that
> > > producers and consumers don't attempt to reconnect. However, I think
> > > we might still have a race condition that could make tenant or
> > > namespace deletion fail. Specifically, if a new producer or consumer
> > > creates a topic after the namespace deletion has started but
> > > before it is complete. Do you agree that the underlying race still
> > exists?
> > >
> > > In my view, the fundamental problem here is that deleting certain Pulsar
> > > resources takes time and, in a distributed system, that means race
> > > conditions.
> > >
> > > What if we expand our usage of the "terminated" feature to apply to
> > > namespaces (and tenants)? Then, a terminated namespace can have
> > > bundles and topics can be deleted but not created (just as a terminated
> > > topic cannot have any new messages published to it). This would take
> > > care of all topic creation race conditions. We'd probably need to add
> > > new protobuf exceptions for this feature.
> > >
> > > Thanks,
> > > Michael
> > >
> > >
> > > On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
> > > <zky...@streamnative.io.invalid> wrote:
> > >>
> > >> +1
> > >>
> > >>
> > >> Thanks,
> > >> Zike
> > >>
> > >> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org>
> > wrote:
> > >>>
> > >>> Hi
> > >>> The PIP link : https://github.com/apache/pulsar/issues/13989
> > >>>
> > >>> Regards
> > >>> Jiwei Guo (Tboy)
> > >>>
> > >>>
> > >>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com
> > >
> > >>> wrote:
> > >>>
> > >>>> Hello everyone,
> > >>>>
> > >>>> I want to start a discussion about PIP-139 : Support Broker send
> > command
> > >>>> to real close producer/consumer.
> > >>>>
> > >>>> This is the PIP document
> > >>>>
> > >>>> https://github.com/apache/pulsar/issues/13989 <
> > >>>> https://github.com/apache/pulsar/issues/13979>
> > >>>>
> > >>>> Please check it out and feel free to share your thoughts.
> > >>>>
> > >>>> Best,
> > >>>> Mattison
> > >>>>
> > >>>>
> > >>>> ———————— Pasted below for quoting convenience.
> > >>>>
> > >>>>
> > >>>>
> > >>>> Relation pull request:  #13337
> > >>>> Authors: @Technoboy-  @mattisonchao
> > >>>>
> > >>>> ## Motivation
> > >>>>
> > >>>> Before we discuss this pip, I'd like to supplement some context to
> > help
> > >>>> contributors who don't want to read the original pull request.
> > >>>>
> > >>>>> When there are no user-created topics under a namespace, Namespace
> > >>>> should be deleted. But currently, the system topic existed and the
> > >>>> reader/producer could auto-create the system which may cause the
> > namespace
> > >>>> deletion to fail.
> > >>>>
> > >>>> For this reason, we need to close the system topic reader/producer
> > first,
> > >>>> then remove the system topic. finally, remove the namespace.
> > >>>>
> > >>>> Following this way, we first want to use ``terminate`` to solve this
> > >>>> problem. then we found producers can disconnect, but consumers are
> > still
> > >>>> alive. So, another PR #13960 wants to add consumers' closing logic.
> > >>>>
> > >>>> After #13960, all things look good, but another problem appears. that
> > is
> > >>>> we need to wait until consumers completely consume all messages (this
> > may
> > >>>> make terminate topic so long and the operation timeout)then get
> > >>>> ``reachedEndOfTopic``. the relative code here :
> > >>>>
> > >>>>
> > >>>>
> > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
> > >>>>
> > >>>> In the #13337 case, we need to force close consumers immediately. So
> > we
> > >>>> write this PIP to discuss another way to resolve this problem.
> > >>>>
> > >>>> ## Goal
> > >>>>
> > >>>> We can add a new field(`allow_reconnect`) in command
> > >>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
> > >>>> producer/consumers immediately.
> > >>>>
> > >>>> ## API Changes
> > >>>>
> > >>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
> > >>>>
> > >>>> ```java
> > >>>> **Before**
> > >>>>
> > >>>> message CommandCloseProducer {
> > >>>>    required uint64 producer_id = 1;
> > >>>>    required uint64 request_id = 2;
> > >>>> }
> > >>>>
> > >>>> **After**
> > >>>> message CommandCloseProducer {
> > >>>>    required uint64 producer_id = 1;
> > >>>>    required uint64 request_id = 2;
> > >>>>    optional bool allow_reconnect = 3 [default = true];
> > >>>> }
> > >>>> ```
> > >>>>
> > >>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> > >>>> ```java
> > >>>> **Before**
> > >>>>
> > >>>> message CommandCloseConsumer {
> > >>>>    required uint64 consumer_id = 1;
> > >>>>    required uint64 request_id = 2;
> > >>>> }
> > >>>>
> > >>>> **After**
> > >>>> message CommandCloseConsumer {
> > >>>>    required uint64 consumer_id = 1;
> > >>>>    required uint64 request_id = 2;
> > >>>>    optional bool allow_reconnect = 3 [default = true];
> > >>>> }
> > >>>> ```
> > >>>>
> > >>>> ## Implementation
> > >>>>
> > >>>> ### ClientCnx - Producer:
> > >>>>
> > >>>> **Before**
> > >>>> ```java
> > >>>>    @Override
> > >>>>    protected void handleCloseProducer(CommandCloseProducer
> > closeProducer)
> > >>>> {
> > >>>>        log.info("[{}] Broker notification of Closed producer: {}",
> > >>>> remoteAddress, closeProducer.getProducerId());
> > >>>>        final long producerId = closeProducer.getProducerId();
> > >>>>        ProducerImpl<?> producer = producers.get(producerId);
> > >>>>        if (producer != null) {
> > >>>>            producer.connectionClosed(this);
> > >>>>        } else {
> > >>>>            log.warn("Producer with id {} not found while closing
> > producer
> > >>>> ", producerId);
> > >>>>        }
> > >>>>    }
> > >>>> ```
> > >>>> After:
> > >>>> ```java
> > >>>>   @Override
> > >>>>    protected void handleCloseProducer(CommandCloseProducer
> > closeProducer)
> > >>>> {
> > >>>>        log.info("[{}] Broker notification of Closed producer: {}",
> > >>>> remoteAddress, closeProducer.getProducerId());
> > >>>>        final long producerId = closeProducer.getProducerId();
> > >>>>        ProducerImpl<?> producer = producers.get(producerId);
> > >>>>        if (producer != null) {
> > >>>>            if (closeProducer.isAllowReconnect) {
> > >>>>                producer.connectionClosed(this);
> > >>>>            } else {
> > >>>>                producer.closeAsync();
> > >>>>            }
> > >>>>        } else {
> > >>>>            log.warn("Producer with id {} not found while closing
> > producer
> > >>>> ", producerId);
> > >>>>        }
> > >>>>    }
> > >>>> ```
> > >>>> ### ClientCnx - Consumer:
> > >>>>
> > >>>> **Before**
> > >>>> ```java
> > >>>>    @Override
> > >>>>    protected void handleCloseConsumer(CommandCloseConsumer
> > closeConsumer)
> > >>>> {
> > >>>>        log.info("[{}] Broker notification of Closed consumer: {}",
> > >>>> remoteAddress, closeConsumer.getConsumerId());
> > >>>>        final long consumerId = closeConsumer.getConsumerId();
> > >>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
> > >>>>        if (consumer != null) {
> > >>>>            consumer.connectionClosed(this);
> > >>>>        } else {
> > >>>>            log.warn("Consumer with id {} not found while closing
> > consumer
> > >>>> ", consumerId);
> > >>>>        }
> > >>>>    }
> > >>>> ```
> > >>>> **After**
> > >>>> ```java
> > >>>>    @Override
> > >>>>    protected void handleCloseConsumer(CommandCloseConsumer
> > closeConsumer)
> > >>>> {
> > >>>>        log.info("[{}] Broker notification of Closed consumer: {}",
> > >>>> remoteAddress, closeConsumer.getConsumerId());
> > >>>>        final long consumerId = closeConsumer.getConsumerId();
> > >>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
> > >>>>        if (consumer != null) {
> > >>>>            if (closeConsumer.isAllowReconnect) {
> > >>>>                consumer.connectionClosed(this);
> > >>>>            } else {
> > >>>>                consumer.closeAsync();
> > >>>>            }
> > >>>>        } else {
> > >>>>            log.warn("Consumer with id {} not found while closing
> > consumer
> > >>>> ", consumerId);
> > >>>>        }
> > >>>>    }
> > >>>> ```
> > >>>>
> > >>>>
> > >>>> ## Reject Alternatives
> > >>>>
> > >>>> none.
> > >>
> > >>
> > >>
> > >> --
> > >> Zike Yang
> >
> >

Reply via email to