RE: Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-03-19 Thread Horizon
> Penghui suggested instead of communicating via Admin API we use the same
> methodology used in the new load balancer - using a compacted topic as a
> queue:
> The system topic you create will have compaction enabled.
> You write the pending ledger delete, and the key is ledger-ID and
> ledger-type (BK, offloaded).
> All brokers will consume this compacted topic as table-view. Basically upon
> restart, you read all messages into memory.
> When you reading you only keep the messages related to topic the broker
> owns, others you discard (table view have filter feature).
> For each message in table-view, you delete the ledgers, then write a
> message to the topic with same key, null message, meaning it will be
> deleted.
> 
> This way you don't need to invent a new private Admin API.
> 
> WDYT?

I got it. it’s a good solution. I will tune the pip using this way.



Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-03-14 Thread Asaf Mesika
On Sun, Mar 12, 2023 at 7:21 PM Yan Zhao  wrote:

> > > In the LedgerDeletionService start, it will create a producer for
> sending
> > > pending delete ledger. When deleting a ledger, the producer sends
> > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds,
> delete
> > > the ledger id from the whole ledgers and update the ledgers into the
> meta
> > > store. If the sent fails, do nothing.
> >
> > You don't delete the ledger ID from the whole ledgers, then update the
> > ledgers in the metadata store. We described it above already; we remove
> the
> > pending delete ledger ID from the list of ledger IDs, publish the
> message,
> > and only at the end update the metadata store.
> Yes, correct it.
>
> > > In the LedgerDeletionService start, it will create a producer for
> sending
> > > pending delete ledger. When deleting a ledger, the producer sends
> > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds,
> delete
> > > the ledger id from the whole ledgers and update the ledgers into the
> meta
> > > store. If the sent fails, do nothing.
>
> > After that section, you write the methods of DeletionService and the
> > pseudo-code, but it's hard to understand what goes where: What exactly
> > deletion service does, and where the pseudo code will actually go into.
> It's just a summary, more detailed information on the later sections.
>
> > Also, let's concentrate on the implementation details in the
> Implementation
> > section High-Level Design. Let the readers understand the concept end to
> > end in the High-Level Design, and then in the Implementation, start
> diving
> > into the nitty gritty details.
>
>
> > > The consumer uses pulsar admin to send PendingDeleteLedgerInfo by the
> > > broker restful API.
>
> > Please explain here that you will introduce a new Admin API for that.
> Also,
> > explain that you will issue that API against the broker who owns the
> topic
> > (The schema registry has a ledger per topic?)
> correct it.
>
> > I still don't understand which methods or behavior you're changing
> exactly
> > for the managed ledger, cursor, and schema.
> > Why am I asking?
> > - Ledger deletion can happen because:
> > * Background truncation since ledgers are allowed to be deleted based on
> > expiration policy.
> > * Truncate request by an admin API.
> > * More?
> All the ways you mention do the same work: trigger
> ManagedLedgerImpl#internalTrimLedgers.
> We change the implementation approach in the internalTrimLedgers.
>
> > If you find the ledger ID is still in the meta-store, you return a
> failure,
> > and the consumer acks the message, right?
> > If it's background deletion, all good since it will kick in again in a
> few
> > seconds and retry.
> > But what happens if a user runs a REST admin API command to truncate the
> > topic?
> Using the REST admin API command to truncate the topic is not different
> from the background task. It's happening in the first metadata deletion
> phase, we didn't check whether the ledger is in the metadata or not, the
> first phase only picks all consumed ledgers, then try to delete them.
>
> > > check if the ledger is still in use,
>
> > You mean, check if the ledger ID still exists in the ledger ID list of
> the
> > topic in the metadata store, right?
> >
> >  then check the PendingDeleteLedgerInfo ledgerType.
> >
> > Check for what?
> see 3.1, 3.2.
>
>
> > > 2.2 If the topic not exists, the consumer checks the
> > > PendingDeleteLedgerInfo ledgerType.
> > > 3.1 If the ledgerType is Ledger, delete the data at the bookkeeper
> side.
> > > 3.2 If the ledgerType is Offload-Ledger, delete the data at the
> > > tiered-storage side.
>
> > Something doesn't look good in the numbering and how you structure it.
> 3.1
> > is probably an indent inside 2.2, no?
> Sorry, it may confuse the user. I add the flow hint in the pip.
>
>
> > 1. What happens if the topic doesn't exist when you are in the broker in
> > the delete pending ledger API implementation?
> The broker will respond 404 Topic not found an error to the consumer, the
> consumer knows the topic already does not exist, it will delete the ledger
> locally.
>
> You need to write that in your PIP. In your pseudo code you make it appear
as if the failure is boolean, but in fact the response should be an error
code.



> > 2. Say the ledger ID is still in the metadata store. If the background
> > truncation of the managed ledge was the trigger, there is no real point
> in
> > retrying, right? Once you start over, another message will be sent to the
> > system topic.
> In the storage deletion phase, If the ledger id is still in the metadata
> store, we think it will be used, we didn't delete it and ack the message.
> Due to the ledger is still in the metadata store, it will be deleted at
> the next background truncation task. So the ledger will be deleted finally.
>
>
> > 3. Can you explain exactly the cases in which it is actually worth doing
> > the retry?
> > I mean the obvious: 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-03-12 Thread Yan Zhao
> > In the LedgerDeletionService start, it will create a producer for sending
> > pending delete ledger. When deleting a ledger, the producer sends
> > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, delete
> > the ledger id from the whole ledgers and update the ledgers into the meta
> > store. If the sent fails, do nothing.
>
> You don't delete the ledger ID from the whole ledgers, then update the
> ledgers in the metadata store. We described it above already; we remove the
> pending delete ledger ID from the list of ledger IDs, publish the message,
> and only at the end update the metadata store.
Yes, correct it.

> > In the LedgerDeletionService start, it will create a producer for sending
> > pending delete ledger. When deleting a ledger, the producer sends
> > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, delete
> > the ledger id from the whole ledgers and update the ledgers into the meta
> > store. If the sent fails, do nothing.

> After that section, you write the methods of DeletionService and the
> pseudo-code, but it's hard to understand what goes where: What exactly
> deletion service does, and where the pseudo code will actually go into.
It's just a summary, more detailed information on the later sections. 

> Also, let's concentrate on the implementation details in the Implementation
> section High-Level Design. Let the readers understand the concept end to
> end in the High-Level Design, and then in the Implementation, start diving
> into the nitty gritty details.


> > The consumer uses pulsar admin to send PendingDeleteLedgerInfo by the
> > broker restful API.

> Please explain here that you will introduce a new Admin API for that. Also,
> explain that you will issue that API against the broker who owns the topic
> (The schema registry has a ledger per topic?)
correct it.

> I still don't understand which methods or behavior you're changing exactly
> for the managed ledger, cursor, and schema.
> Why am I asking?
> - Ledger deletion can happen because:
> * Background truncation since ledgers are allowed to be deleted based on
> expiration policy.
> * Truncate request by an admin API.
> * More?
All the ways you mention do the same work: trigger 
ManagedLedgerImpl#internalTrimLedgers. 
We change the implementation approach in the internalTrimLedgers.

> If you find the ledger ID is still in the meta-store, you return a failure,
> and the consumer acks the message, right?
> If it's background deletion, all good since it will kick in again in a few
> seconds and retry.
> But what happens if a user runs a REST admin API command to truncate the
> topic?
Using the REST admin API command to truncate the topic is not different from 
the background task. It's happening in the first metadata deletion phase, we 
didn't check whether the ledger is in the metadata or not, the first phase only 
picks all consumed ledgers, then try to delete them.

> > check if the ledger is still in use,

> You mean, check if the ledger ID still exists in the ledger ID list of the
> topic in the metadata store, right?
> 
>  then check the PendingDeleteLedgerInfo ledgerType.
> 
> Check for what?
see 3.1, 3.2.


> > 2.2 If the topic not exists, the consumer checks the
> > PendingDeleteLedgerInfo ledgerType.
> > 3.1 If the ledgerType is Ledger, delete the data at the bookkeeper side.
> > 3.2 If the ledgerType is Offload-Ledger, delete the data at the
> > tiered-storage side.

> Something doesn't look good in the numbering and how you structure it. 3.1
> is probably an indent inside 2.2, no?
Sorry, it may confuse the user. I add the flow hint in the pip.


> 1. What happens if the topic doesn't exist when you are in the broker in
> the delete pending ledger API implementation?
The broker will respond 404 Topic not found an error to the consumer, the 
consumer knows the topic already does not exist, it will delete the ledger 
locally.

> 2. Say the ledger ID is still in the metadata store. If the background
> truncation of the managed ledge was the trigger, there is no real point in
> retrying, right? Once you start over, another message will be sent to the
> system topic.
In the storage deletion phase, If the ledger id is still in the metadata store, 
we think it will be used, we didn't delete it and ack the message. 
Due to the ledger is still in the metadata store, it will be deleted at the 
next background truncation task. So the ledger will be deleted finally.


> 3. Can you explain exactly the cases in which it is actually worth doing
> the retry?
> I mean the obvious: you failed to delete it from BK due to a transient
> error. But the previous one of checking the list? worth the retry?
If the ledger is still in the meta store at the second storage deletion, we 
won't retry. we will ack the message to avoid useless retry. 
We retry deletion only on the storage delete failed case.


> then respond to the error msg "ledger still in use" to the consumer.

> You need to have those 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-03-04 Thread Yan Zhao
Hi, Asaf, Tune the pip https://github.com/apache/pulsar/issues/16569, please 
help to review it again, thanks!


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-03-02 Thread Yan Zhao
Hi, Asaf. Tune the pip-186(https://github.com/apache/pulsar/issues/16569), 
please help to view it again. thanks!

On 2023/02/16 13:50:54 Yan Zhao wrote:
> > If understood correctly, every broker will have a consumer right? You will
> > use a fail-over subscription? The retry-topic is consumed by the same
> > subscription, same consumer?
> Yes, you are right, there is the case you mention. The deletion is 
> idempotent, I'm not sure if it's worth making it sync for it.
> 
> > In this very long mailing list thread, we have mentioned many fixes to be
> > done. Can you ping in the mailing list once you have managed to fix it all?
> 
> That's fine, I will push the new pip in two days. After the new pip pushing, 
> I will ping you.
> 


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-16 Thread Yan Zhao
> If understood correctly, every broker will have a consumer right? You will
> use a fail-over subscription? The retry-topic is consumed by the same
> subscription, same consumer?
Yes, you are right, there is the case you mention. The deletion is idempotent, 
I'm not sure if it's worth making it sync for it.

> In this very long mailing list thread, we have mentioned many fixes to be
> done. Can you ping in the mailing list once you have managed to fix it all?

That's fine, I will push the new pip in two days. After the new pip pushing, I 
will ping you.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-16 Thread Asaf Mesika
On Tue, Feb 14, 2023 at 1:02 PM Yan Zhao  wrote:

> > Shouldn't you specify the changes you are going to make in
> > `PulsarApi.proto`?
> We didn't change the proto file, we use the Restful API(AdminAPI).
> > Also, shouldn't you change the wording in this sentence in the PIP to
> > clarify you will use AdminAPI / Pulsar RPC?
> We use AdminAPI, I will tune the pip.
>
>
>
> > > You need to specify in the PIP that you will publish one message for
> > deletion from BK, and one message for deletion of from offloader and
> > explain why. Also provide the context you just gave me about where you
> find
> > if it is actually offloaded or not.
> Ok.
>
> > Let me see if I understand correctly. Now the PIP says:
> >
> > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
> >
> > and you say it should be:
> >
> > private MLDataFormats.ManagedLedgerInfo.LedgerInfo.OffloadContext
> > offloadContext;
> >
> > ?
> Yes.
>
>
> >
> > In that case I'm not sure I understand the need for
> >
> >   /**
> >  * Extent properties.
> >  */
> > private Map properties = new HashMap<>();
> >
>
> It can be removed.
>
> > Not just the title, your PIP needs to explain everything we wrote above
> > about trim.
>
> I know, and I will tune the PIP contents.
>
> > Ok, say you only delete from ZK in the first step. Still what I wrote
> above
> > still applies: You might have 2 messages trying to delete same ledger at
> > the same time. Shouldn't this have a lock to protect against it?
>
> We design that the consumer only handles one message at the same time,
> which can ensure sync.
>
If understood correctly, every broker will have a consumer right? You will
use a fail-over subscription? The retry-topic is consumed by the same
subscription, same consumer?


In this very long mailing list thread, we have mentioned many fixes to be
done. Can you ping in the mailing list once you have managed to fix it all?


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-14 Thread Yan Zhao
> Shouldn't you specify the changes you are going to make in
> `PulsarApi.proto`?
We didn't change the proto file, we use the Restful API(AdminAPI).
> Also, shouldn't you change the wording in this sentence in the PIP to
> clarify you will use AdminAPI / Pulsar RPC?
We use AdminAPI, I will tune the pip.



> > You need to specify in the PIP that you will publish one message for
> deletion from BK, and one message for deletion of from offloader and
> explain why. Also provide the context you just gave me about where you find
> if it is actually offloaded or not.
Ok. 

> Let me see if I understand correctly. Now the PIP says:
> 
> private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
> 
> and you say it should be:
> 
> private MLDataFormats.ManagedLedgerInfo.LedgerInfo.OffloadContext
> offloadContext;
> 
> ?
Yes.


> 
> In that case I'm not sure I understand the need for
> 
>   /**
>  * Extent properties.
>  */
> private Map properties = new HashMap<>();
> 

It can be removed.

> Not just the title, your PIP needs to explain everything we wrote above
> about trim.

I know, and I will tune the PIP contents.

> Ok, say you only delete from ZK in the first step. Still what I wrote above
> still applies: You might have 2 messages trying to delete same ledger at
> the same time. Shouldn't this have a lock to protect against it?

We design that the consumer only handles one message at the same time, which 
can ensure sync.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-13 Thread Asaf Mesika
On Mon, Feb 6, 2023 at 12:02 PM Yan Zhao  wrote:

>
>
> On 2023/02/05 14:40:10 Asaf Mesika wrote:
> > On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao  wrote:
> >
> > > > If you persisted the message successfully by the producer and the
> broker
> > > > was terminated before being able to delete the ledger from the
> metadata
> > > > service?
> > > If the broker is terminated, the consumer won't ack the message, the
> > > message will be re-consume later.
> >
> >
> > Let me quote the text I was replying to:
> >
> > > When pulsar wants to delete a ledger, ManagedLedger uses
> > > ledgerDeletionService to send a message, the message content contains
> the
> > > waiting delete ledger info. After sending success, delete the ledger id
> > > from the metadata store.
> >
> >
> > If I understand correctly, the following steps will happen:
> > 1. Producer writes a message to system topic, containing the ledger
> > information to delete.
> > 2. Upon success of (1), the ledger ID is deleted from the metadata store.
> >
> > You reply was about the consumer, my question was about the producer, so
> > I'm writing my reply here again (full reply):
> >
> > If you persisted the message successfully by the producer and the broker
> > was terminated before being able to delete the ledger (ID) from the
> > metadata service?
>
> After the broker restart, the broker still thinks the ledger is available,
> and the broker can still read data from this ledger.
> The consumer received the message, it triggers the corresponding broker to
> delete the ledger.
> The ledger still exists in the metadata, ignore deletion, and the consumer
> ack the message.
>
> At the next trimConsumedLedgersInBackground, the ledger will be deleted
> again.
>
> >
> > >
> > > > I recommend having the logic to delete the ledger be done in the
> message
> > > > consumer side:
> > > > - if the ledger exists in the MD store, delete it.
> > > > - send delete command to BK
> > > > Both as I understand are idempotent. If for some reason one action
> was
> > > done
> > > > but the other wasn't - ZK down, BK down, broker hosting the consumer
> > > > terminated - either the message will not be acknowledged, or you
> > > negatively
> > > > acknowledge.
> > > We send a delete command to the broker, it will connect to the
> > > corresponding broker which loads the topic. The corresponding broker
> > > received the command, then passes the command to ManagedLedger, the
> > > ManagedLedger does the actual delete operation.
> > > If the consumer does the delete operation, it's a little unreasonable.
> The
> > > ledger manager should be `ManagedLedger`, let it do the delete will be
> > > better.
> > >
> >
> > You say the consumer will send a delete command to the broker: By what
> > means? Sending a message on a topic, or running an RPC request by adding
> a
> > new command to the PulsarApi.proto file?
> > If the latter - shouldn't this be documented?
> Yes, by sending an RPC request.
> I just add sections:
> `Introduce admin api in Topics`.
> `Introduce admin api in Schemas.`
>
> I will describe it detailed.
>

Shouldn't you specify the changes you are going to make in
`PulsarApi.proto`?
Also, shouldn't you change the wording in this sentence in the PIP to
clarify you will use AdminAPI / Pulsar RPC?

When receiving a pending delete ledger msg, we will check if the topic
> still exists. If the topic exists, send a delete command
> (PendingDelteLedger) to the broker which owns the topic.




>
> >
> > Let me assume the consumer which receives the message, will run an Pulsar
> > API command to instruct the broker assigned to this topic to delete the
> > ledger - Why the implementation of this command in the assigned broker
> > can't have both the deletion of the ledger ID from the ZK, AND deletion
> > from BK? Why do we need to have the ZK deletion done on the producer
> side,
> > after we produced the message?
>
> In pulsar, as long as we delete the ledger id from metadata store, we
> think the ledger has already been deleted. The Pulsar didn't refer to the
> ledger anymore, although the ledger still exists in the BK. The ledger
> deletion in BK can be done at any time, as long as we ensure the ledger
> deletion in BK will do finally.
>
> If we delete both ledger ID from zk and ledger from bk when the broker
> received the delete RPC command.
> There may be a problem in this case.
> The broker wants to delete ledger 1, it send the message to the system
> topic and didn't remove the ledger id 1 from the zk. So the ManagedLedger
> still thinks ledger 1 is readable, it may read ledger 1 from BK. But at the
> same time, the consumer received msg, it send the delete RPC command to the
> broker, the broker delete ledger id from zk and delete ledger from BK. The
> ManagedLedger is reading from the BK, it will throw exception.
>
> So we remove the ledger id from zk at the producer stage, the broker
> thinks the ledger has already deleted, and didn't do any operation for the
> ledger.
>
> 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-08 Thread Yan Zhao



On 2023/02/07 18:41:20 Heesung Sohn wrote:
> On Sun, Feb 5, 2023 at 2:26 AM Yan Zhao  wrote:
> 
> > On 2023/02/03 18:14:53 Heesung Sohn wrote:
> > > There are some cases to trigger it.
> > > 1. A cursor be removed.
> > > 2. Close the current ledger and create a new ledger.
> > > 3. Consumer ack the message, the slowest cursor move forward.
> > > 4. User trigger truncateTopic by admin.
> > >
> > > I see that this pip is for the internal ledger deletion cases(1-3 above),
> > > and it appears that such internal deletion operations do not require
> > > pre-validation or status checks(and no additional iops on the metadata
> > > store). I agree that we need a separate pip for async admin operations.
> >
> > This pip is also applied to case 4. All cases will invoke
> > `trimConsumedLedgersInBackground`.
> > This pip acts inside the method.
> >
> 
> For the long-running async admin operations such as
> deleteTopic/Namespace/Tenant/*, as I mentioned, I think we better provide
> the describe* APIs to enable the admins to check the status of the async
> operation.

> I believe we can first use this system topic from this pip to support case
> 4, "admin async operations."
> Still, we probably need a separate pip to discuss
> - the expected behavior/experience of async admin operations.
> - if we want to provide describe* APIs for the async operations.
> - target resources
> - architecture/components how to support describe* APIs

Agree, if there are some long-running async, and don't know the expected 
end-time operations.
We can add the describe* API to get the progress rate of the operations.




Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-07 Thread Heesung Sohn
On Sun, Feb 5, 2023 at 2:26 AM Yan Zhao  wrote:

> On 2023/02/03 18:14:53 Heesung Sohn wrote:
> > There are some cases to trigger it.
> > 1. A cursor be removed.
> > 2. Close the current ledger and create a new ledger.
> > 3. Consumer ack the message, the slowest cursor move forward.
> > 4. User trigger truncateTopic by admin.
> >
> > I see that this pip is for the internal ledger deletion cases(1-3 above),
> > and it appears that such internal deletion operations do not require
> > pre-validation or status checks(and no additional iops on the metadata
> > store). I agree that we need a separate pip for async admin operations.
>
> This pip is also applied to case 4. All cases will invoke
> `trimConsumedLedgersInBackground`.
> This pip acts inside the method.
>

For the long-running async admin operations such as
deleteTopic/Namespace/Tenant/*, as I mentioned, I think we better provide
the describe* APIs to enable the admins to check the status of the async
operation.

I believe we can first use this system topic from this pip to support case
4, "admin async operations."
Still, we probably need a separate pip to discuss
- the expected behavior/experience of async admin operations.
- if we want to provide describe* APIs for the async operations.
- target resources
- architecture/components how to support describe* APIs


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-06 Thread Yan Zhao



On 2023/02/05 14:40:10 Asaf Mesika wrote:
> On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao  wrote:
> 
> > > If you persisted the message successfully by the producer and the broker
> > > was terminated before being able to delete the ledger from the metadata
> > > service?
> > If the broker is terminated, the consumer won't ack the message, the
> > message will be re-consume later.
> 
> 
> Let me quote the text I was replying to:
> 
> > When pulsar wants to delete a ledger, ManagedLedger uses
> > ledgerDeletionService to send a message, the message content contains the
> > waiting delete ledger info. After sending success, delete the ledger id
> > from the metadata store.
> 
> 
> If I understand correctly, the following steps will happen:
> 1. Producer writes a message to system topic, containing the ledger
> information to delete.
> 2. Upon success of (1), the ledger ID is deleted from the metadata store.
> 
> You reply was about the consumer, my question was about the producer, so
> I'm writing my reply here again (full reply):
> 
> If you persisted the message successfully by the producer and the broker
> was terminated before being able to delete the ledger (ID) from the
> metadata service?

After the broker restart, the broker still thinks the ledger is available, and 
the broker can still read data from this ledger. 
The consumer received the message, it triggers the corresponding broker to 
delete the ledger.
The ledger still exists in the metadata, ignore deletion, and the consumer ack 
the message.

At the next trimConsumedLedgersInBackground, the ledger will be deleted again.

> 
> >
> > > I recommend having the logic to delete the ledger be done in the message
> > > consumer side:
> > > - if the ledger exists in the MD store, delete it.
> > > - send delete command to BK
> > > Both as I understand are idempotent. If for some reason one action was
> > done
> > > but the other wasn't - ZK down, BK down, broker hosting the consumer
> > > terminated - either the message will not be acknowledged, or you
> > negatively
> > > acknowledge.
> > We send a delete command to the broker, it will connect to the
> > corresponding broker which loads the topic. The corresponding broker
> > received the command, then passes the command to ManagedLedger, the
> > ManagedLedger does the actual delete operation.
> > If the consumer does the delete operation, it's a little unreasonable. The
> > ledger manager should be `ManagedLedger`, let it do the delete will be
> > better.
> >
> 
> You say the consumer will send a delete command to the broker: By what
> means? Sending a message on a topic, or running an RPC request by adding a
> new command to the PulsarApi.proto file?
> If the latter - shouldn't this be documented?
Yes, by sending an RPC request. 
I just add sections:
`Introduce admin api in Topics`.
`Introduce admin api in Schemas.`

I will describe it detailed.

> 
> Let me assume the consumer which receives the message, will run an Pulsar
> API command to instruct the broker assigned to this topic to delete the
> ledger - Why the implementation of this command in the assigned broker
> can't have both the deletion of the ledger ID from the ZK, AND deletion
> from BK? Why do we need to have the ZK deletion done on the producer side,
> after we produced the message?

In pulsar, as long as we delete the ledger id from metadata store, we think the 
ledger has already been deleted. The Pulsar didn't refer to the ledger anymore, 
although the ledger still exists in the BK. The ledger deletion in BK can be 
done at any time, as long as we ensure the ledger deletion in BK will do 
finally.

If we delete both ledger ID from zk and ledger from bk when the broker received 
the delete RPC command.
There may be a problem in this case. 
The broker wants to delete ledger 1, it send the message to the system topic 
and didn't remove the ledger id 1 from the zk. So the ManagedLedger still 
thinks ledger 1 is readable, it may read ledger 1 from BK. But at the same 
time, the consumer received msg, it send the delete RPC command to the broker, 
the broker delete ledger id from zk and delete ledger from BK. The 
ManagedLedger is reading from the BK, it will throw exception.

So we remove the ledger id from zk at the producer stage, the broker thinks the 
ledger has already deleted, and didn't do any operation for the ledger.

> 
> 
> > > General question: When a ledger is persisted to ZK, where is the ledger
> > > metadata persisted in ZK (more specifically it's metadata, which includes
> > > the associate?
> > > Is it also used when building out the key (path) in ZK?
> >
> >
> > https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74
> >
> > It's the content in the zk node. when creating a ledger by bookkeeper, it
> > will create a path like `/ledgers/00//L`, the path value is an
> > instance of 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-05 Thread Asaf Mesika
On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao  wrote:

> > If you persisted the message successfully by the producer and the broker
> > was terminated before being able to delete the ledger from the metadata
> > service?
> If the broker is terminated, the consumer won't ack the message, the
> message will be re-consume later.


Let me quote the text I was replying to:

> When pulsar wants to delete a ledger, ManagedLedger uses
> ledgerDeletionService to send a message, the message content contains the
> waiting delete ledger info. After sending success, delete the ledger id
> from the metadata store.


If I understand correctly, the following steps will happen:
1. Producer writes a message to system topic, containing the ledger
information to delete.
2. Upon success of (1), the ledger ID is deleted from the metadata store.

You reply was about the consumer, my question was about the producer, so
I'm writing my reply here again (full reply):

If you persisted the message successfully by the producer and the broker
was terminated before being able to delete the ledger (ID) from the
metadata service?


>
> > I recommend having the logic to delete the ledger be done in the message
> > consumer side:
> > - if the ledger exists in the MD store, delete it.
> > - send delete command to BK
> > Both as I understand are idempotent. If for some reason one action was
> done
> > but the other wasn't - ZK down, BK down, broker hosting the consumer
> > terminated - either the message will not be acknowledged, or you
> negatively
> > acknowledge.
> We send a delete command to the broker, it will connect to the
> corresponding broker which loads the topic. The corresponding broker
> received the command, then passes the command to ManagedLedger, the
> ManagedLedger does the actual delete operation.
> If the consumer does the delete operation, it's a little unreasonable. The
> ledger manager should be `ManagedLedger`, let it do the delete will be
> better.
>

You say the consumer will send a delete command to the broker: By what
means? Sending a message on a topic, or running an RPC request by adding a
new command to the PulsarApi.proto file?
If the latter - shouldn't this be documented?

Let me assume the consumer which receives the message, will run an Pulsar
API command to instruct the broker assigned to this topic to delete the
ledger - Why the implementation of this command in the assigned broker
can't have both the deletion of the ledger ID from the ZK, AND deletion
from BK? Why do we need to have the ZK deletion done on the producer side,
after we produced the message?


> > General question: When a ledger is persisted to ZK, where is the ledger
> > metadata persisted in ZK (more specifically it's metadata, which includes
> > the component?
> > Is it also used when building out the key (path) in ZK?
>
>
> https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74
>
> It's the content in the zk node. when creating a ledger by bookkeeper, it
> will create a path like `/ledgers/00//L`, the path value is an
> instance of LedgerMetadataImpl.
> The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's
> metadata.
>
> If the ledger is for Ledger, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:managed-ledger`
> `key:pulsar/managed-ledger, value: ledgerName`
>
> If the ledger is for Cursor, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:managed-ledger`
> `key:pulsar/managed-ledger, value: ledgerName`
> `key:pulsar/cursor, value: curSorName`
>
> If the ledger is for schema, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:schema`
> `key:pulsar/schemaId, value: schemaId`
>
> So when we get the ledger metadata from bookkeeper, we can get the ledger
> source.
>

Thanks for the info!


>
> > Isn't the type saved together with the ledger in ZK?
> We need to differ it, the same ledger may store both on the bk side and
> the offload side.
> If a ledger want to delete the bk data and the offload data, it should
> publishes two message to the system topic. The broker needs it to determine
> whether to delete offload or bk.

So you're saying that the information whether this ledger is either only in
BK, or only in offload storage or both is NOT encoded in the Ledger custom
metadata properties of the ledger?
The offloader saves that information for that ledger in another place in
ZK?

If this ledger is a Cursor ledger, how do you know where to delete it
inside ZK if you don't include the cursor name in the
PendingDeleteLedgerInfo message?


>
> > It's for the offloaded ledger, when we want to delete the offload ledger,
> > > we need offloadedContextUuid, here we can simplify it to
> offloadContextUuid.
>
> > Sounds much better. Maybe offloadedLedgerUUID? (why context?)
> Agree.
>
> >
> >  Are you 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-05 Thread Yan Zhao
On 2023/02/03 20:05:59 Heesung Sohn wrote:
> For these internal requesters,
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 
> How do we prevent these callers from requesting duplicate requests for the
> same resources(ledgers) (how do we handle racing conditions between the
> requesters and consumers? )
These 3 case all invoke `trimConsumedLedgersInBackground`. 
```
executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, 
promise)));
```
The managed ledger only load by one broker. And the executor is thread-safe. 

> It seems like there could be duplicate requests. Are we relying on these
> methods to dedup messages?
> 1. the idempotency of the deletion operations
> 2. enough delay in the requesters' scan cycles
> 3. enable dedup in the system topic
There is a case that will cause duplicate requests. The ManagedLedger wants to 
delete ledger 1, after sending it to the system topic, the broker shut down and 
didn't persist the left ledgers to the metadata store. In the next trigger, 
ledger 1 will be sent to the system topic again.
Here we use `the idempotency of the deletion operations` to handle it.



Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-05 Thread Yan Zhao
On 2023/02/03 18:14:53 Heesung Sohn wrote:
> There are some cases to trigger it.
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 4. User trigger truncateTopic by admin.
> 
> I see that this pip is for the internal ledger deletion cases(1-3 above),
> and it appears that such internal deletion operations do not require
> pre-validation or status checks(and no additional iops on the metadata
> store). I agree that we need a separate pip for async admin operations.

This pip is also applied to case 4. All cases will invoke 
`trimConsumedLedgersInBackground`.
This pip acts inside the method.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-03 Thread Heesung Sohn
For these internal requesters,
1. A cursor be removed.
2. Close the current ledger and create a new ledger.
3. Consumer ack the message, the slowest cursor move forward.

How do we prevent these callers from requesting duplicate requests for the
same resources(ledgers) (how do we handle racing conditions between the
requesters and consumers? )

It seems like there could be duplicate requests. Are we relying on these
methods to dedup messages?
1. the idempotency of the deletion operations
2. enough delay in the requesters' scan cycles
3. enable dedup in the system topic


On Fri, Feb 3, 2023 at 10:14 AM Heesung Sohn 
wrote:

>
>
> There are some cases to trigger it.
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 4. User trigger truncateTopic by admin.
>
> I see that this pip is for the internal ledger deletion cases(1-3 above),
> and it appears that such internal deletion operations do not require
> pre-validation or status checks(and no additional iops on the metadata
> store). I agree that we need a separate pip for async admin operations.
>
>
> On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao  wrote:
>
>> > If we don't want to add more pressure on the metadata store from this
>> > feature as a requirement,
>> > I think we can use a system topic as well. Just to confirm, is this the
>> > direction we are agreeing on?
>> The design is too generic, I'm afraid not it is not suitable for ledger
>> deletion.
>>
>> >
>> > Using a system topic,
>> > 1. A broker receives an async operation request on a target resource
>> from a
>> > client.
>> > 2. If the target resource is available, the broker publishes an
>> operation
>> > request message to the system topic.
>> >2.1 If there is no target resource,
>> > error out(ResourceDoesNotExistException).
>> It's more generic, I'm afraid it not suitable for ledger deletion.
>>
>> In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will
>> trigger the delete operation. It will find the slowest cursor read position
>> in the topic, and find the ledger which is before the slowest cursor read
>> position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or
>> `hasLedgerRetentionExpired`. If so, we think the ledger should be delete,
>> append it to deleteLedgerList, we also check if the ledger
>> `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
>> Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build
>> `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I
>> will remove the ledger from the ledgerList, then persist the ledgerList. If
>> send failed, didn't do anything.
>> Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And
>> we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And
>> persist [2,4,5] to the zk metadata store.
>>
>> There are some cases to trigger it.
>> 1. A cursor be removed.
>> 2. Close the current ledger and create a new ledger.
>> 3. Consumer ack the message, the slowest cursor move forward.
>> 4. User trigger truncateTopic by admin.
>>
>> Case 4 is related to it, if the topic id not exists, throw
>> ResourceDoesNotExistException.
>
> >2.2 If the target resource's state is pending or running, error
>> > out(InvalidResourceStateException).
>> If we use the system topic to store the pending deleted ledger, we can't
>> know it. Unless we read all the messages to check it, it's not realistic
>
> > 3. The broker returns success(messageId) to the client.
>>  ledger deletion may trigger by the broker self.
>> Same above reason.
>>
>> > 4. A system topic consumer(Shared Subscription Mode?) consumes the
>> > operation request message.
>> Yes.
>> > 5. The consumer runs the operation workflow with an x-min timeout.
>> >  5.1 The consumer gets the target resource data from the metadata.
>> > 5.1.1 If there is no targeted resource, complete the operation.
>> Go
>> > to 5.4.
>> >  5.2 The consumer updates the target resource state to running.
>> (state:
>> > running, startedAt: now)
>> >  5.3 Run the workflow step 1 ... n.
>> >  5.4 Acknowledge the message.
>> The consumer received message, the message contains the delete ledger
>> info. Didn't need to get the target resource from metadata. It send delete
>> command to the corresponding broker like produce message. The broker
>> received delete command, it check if the ledger is exists in the
>> ledgerList, if exists, do nothing. If not exists, delete the bk data or
>> offload data.
>> If the topic already be delete, when send delete command, it will throw
>> TopicNotFoundException, then consumer will delete the ledger locally.
>>
>> >
>> >
>> > * I guess we will enable deduplication for the system topic for possibly
>> > duplicated operation requests.
>> That's not essential. The deletion is idempotent.
>>
>> >
>> > * Here, Target Resource means the last metadata to update in the

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-03 Thread Heesung Sohn
There are some cases to trigger it.
1. A cursor be removed.
2. Close the current ledger and create a new ledger.
3. Consumer ack the message, the slowest cursor move forward.
4. User trigger truncateTopic by admin.

I see that this pip is for the internal ledger deletion cases(1-3 above),
and it appears that such internal deletion operations do not require
pre-validation or status checks(and no additional iops on the metadata
store). I agree that we need a separate pip for async admin operations.


On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao  wrote:

> > If we don't want to add more pressure on the metadata store from this
> > feature as a requirement,
> > I think we can use a system topic as well. Just to confirm, is this the
> > direction we are agreeing on?
> The design is too generic, I'm afraid not it is not suitable for ledger
> deletion.
>
> >
> > Using a system topic,
> > 1. A broker receives an async operation request on a target resource
> from a
> > client.
> > 2. If the target resource is available, the broker publishes an operation
> > request message to the system topic.
> >2.1 If there is no target resource,
> > error out(ResourceDoesNotExistException).
> It's more generic, I'm afraid it not suitable for ledger deletion.
>
> In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will
> trigger the delete operation. It will find the slowest cursor read position
> in the topic, and find the ledger which is before the slowest cursor read
> position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or
> `hasLedgerRetentionExpired`. If so, we think the ledger should be delete,
> append it to deleteLedgerList, we also check if the ledger
> `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
> Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build
> `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I
> will remove the ledger from the ledgerList, then persist the ledgerList. If
> send failed, didn't do anything.
> Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we
> send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And
> persist [2,4,5] to the zk metadata store.
>
> There are some cases to trigger it.
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 4. User trigger truncateTopic by admin.
>
> Case 4 is related to it, if the topic id not exists, throw
> ResourceDoesNotExistException.

>2.2 If the target resource's state is pending or running, error
> > out(InvalidResourceStateException).
> If we use the system topic to store the pending deleted ledger, we can't
> know it. Unless we read all the messages to check it, it's not realistic

> 3. The broker returns success(messageId) to the client.
>  ledger deletion may trigger by the broker self.
> Same above reason.
>
> > 4. A system topic consumer(Shared Subscription Mode?) consumes the
> > operation request message.
> Yes.
> > 5. The consumer runs the operation workflow with an x-min timeout.
> >  5.1 The consumer gets the target resource data from the metadata.
> > 5.1.1 If there is no targeted resource, complete the operation.
> Go
> > to 5.4.
> >  5.2 The consumer updates the target resource state to running.
> (state:
> > running, startedAt: now)
> >  5.3 Run the workflow step 1 ... n.
> >  5.4 Acknowledge the message.
> The consumer received message, the message contains the delete ledger
> info. Didn't need to get the target resource from metadata. It send delete
> command to the corresponding broker like produce message. The broker
> received delete command, it check if the ledger is exists in the
> ledgerList, if exists, do nothing. If not exists, delete the bk data or
> offload data.
> If the topic already be delete, when send delete command, it will throw
> TopicNotFoundException, then consumer will delete the ledger locally.
>
> >
> >
> > * I guess we will enable deduplication for the system topic for possibly
> > duplicated operation requests.
> That's not essential. The deletion is idempotent.
>
> >
> > * Here, Target Resource means the last metadata to update in the metadata
> > store.
> > For example, a workflow updates multiple metadata places like the
> > following. In this case, the target resource is the 3rd one.
> > 1. sub-resource-1
> > 2. sub-resource-2
> > 3. target-resource. e.g. ledger, topic metadata.
> >
> > * For Step 5.3, all steps should be idempotent. Also, the consumer
> > periodically updates the operation state in the target resource's
> metadata
> > every x secs. Any retry on a long-running job should resume the operation
> > closer to where it failed instead of running it from the beginning.
>
> > * We could have describe* apis to check any inflight operations state by
> > querying the target resource's operation state, but this will show some
> > eventual consistency because 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-03 Thread Yan Zhao
> If we don't want to add more pressure on the metadata store from this
> feature as a requirement,
> I think we can use a system topic as well. Just to confirm, is this the
> direction we are agreeing on?
The design is too generic, I'm afraid not it is not suitable for ledger 
deletion.

> 
> Using a system topic,
> 1. A broker receives an async operation request on a target resource from a
> client.
> 2. If the target resource is available, the broker publishes an operation
> request message to the system topic.
>2.1 If there is no target resource,
> error out(ResourceDoesNotExistException).
It's more generic, I'm afraid it not suitable for ledger deletion.

In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will trigger 
the delete operation. It will find the slowest cursor read position in the 
topic, and find the ledger which is before the slowest cursor read position. 
Then check the ledgers if `isLedgerRetentionOverSizeQuota` or 
`hasLedgerRetentionExpired`. If so, we think the ledger should be delete, 
append it to deleteLedgerList, we also check if the ledger 
`isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build 
`PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I will 
remove the ledger from the ledgerList, then persist the ledgerList. If send 
failed, didn't do anything.
Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we send 
1,3 to the system topic succeed, send 2 failed. We remove 1,3. And persist 
[2,4,5] to the zk metadata store. 

There are some cases to trigger it.
1. A cursor be removed.
2. Close the current ledger and create a new ledger.
3. Consumer ack the message, the slowest cursor move forward.
4. User trigger truncateTopic by admin.

Case 4 is related to it, if the topic id not exists, throw 
ResourceDoesNotExistException.

>2.2 If the target resource's state is pending or running, error
> out(InvalidResourceStateException).
If we use the system topic to store the pending deleted ledger, we can't know 
it. Unless we read all the messages to check it, it's not realistic.


> 3. The broker returns success(messageId) to the client.
 ledger deletion may trigger by the broker self.
Same above reason.

> 4. A system topic consumer(Shared Subscription Mode?) consumes the
> operation request message.
Yes.
> 5. The consumer runs the operation workflow with an x-min timeout.
>  5.1 The consumer gets the target resource data from the metadata.
> 5.1.1 If there is no targeted resource, complete the operation. Go
> to 5.4.
>  5.2 The consumer updates the target resource state to running. (state:
> running, startedAt: now)
>  5.3 Run the workflow step 1 ... n.
>  5.4 Acknowledge the message.
The consumer received message, the message contains the delete ledger info. 
Didn't need to get the target resource from metadata. It send delete command to 
the corresponding broker like produce message. The broker received delete 
command, it check if the ledger is exists in the ledgerList, if exists, do 
nothing. If not exists, delete the bk data or offload data.
If the topic already be delete, when send delete command, it will throw 
TopicNotFoundException, then consumer will delete the ledger locally.

> 
> 
> * I guess we will enable deduplication for the system topic for possibly
> duplicated operation requests.
That's not essential. The deletion is idempotent. 

> 
> * Here, Target Resource means the last metadata to update in the metadata
> store.
> For example, a workflow updates multiple metadata places like the
> following. In this case, the target resource is the 3rd one.
> 1. sub-resource-1
> 2. sub-resource-2
> 3. target-resource. e.g. ledger, topic metadata.
> 
> * For Step 5.3, all steps should be idempotent. Also, the consumer
> periodically updates the operation state in the target resource's metadata
> every x secs. Any retry on a long-running job should resume the operation
> closer to where it failed instead of running it from the beginning.

> * We could have describe* apis to check any inflight operations state by
> querying the target resource's operation state, but this will show some
> eventual consistency because of the delay between step 3 and step 5.
> 
> 
> Thanks,
> Heesung

Thanks for your good idea, but it may not suitable the ledger deletion. And in 
this pip, I didn't want to operate the ledger metadata too much.
But your idea may be a good solution for other cases. Maybe we can use it to 
draft a new pip.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-03 Thread Yan Zhao
Heesung, thanks. That's a good point for multi steps works. But I'm afraid it 
will increase the zk pressure and it also needs to handle some corner cases. I 
prefer to use a system topic to handle it.

On 2023/01/31 19:05:34 Heesung Sohn wrote:
> On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao  wrote:
> 
> > > - Have we considered a metadata store to persist and dedup deletion
> > > requests instead of the system topic? Why is the system topic the better
> > > choice than a metadata store for this problem?
> > If we use the metadata store to store the middle step ledger, we need to
> > operate the metadata store after deletion every time.
> 
> 
> 
> >
> > And we need a trigger to trigger deletion. In the broker, it may have lots
> > of topics, the ledger deletion is also much. Using the metadata store to
> > store it may be a bottleneck.
> > Using pub/sub is easy to implement, and it is a good trigger to trigger
> > deletion.
> >
> 
> 
> We can group the multiple resource deletions to a single record in the
> metadata store. Also, we can use the metadata store watcher to trigger the
> deletion.
> 
> I can see that a similar transactional operation(using metadata store) can
> be done like the following.
> 
> Alternatively,
> 1. A broker receives a resource(ledger) deletion request from a client.
> 2. If the target resource is available, the broker persists a transaction
> lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata
> store(state:pending, createdAt:now).
>   2.1 If there is no target resource, error
> out(ResourceDoesNotExistException).
>   2.2 If the lock already exists, error out(OperationInProgressExeception).
> 3. The broker returns success to the client.
> 4. The transaction watcher(metadata store listener) on the same broker-id
> is notified.
> 5. The transaction watcher runs the deletion process with an x min timeout.
> 5.1 The transaction watcher updates the lock state (state: running,
> startedAt: now)
> 5.2 Run step 1 ... n (periodically update the lock state and
> updatedAt:now every x secs)
> 5.3 Delete the lock.
> 6. The orphan transaction monitor runs any orphan jobs by retrying step 5.
> (If the watcher fails in the middle at step 5, the lock state will be
> orphan(state:running and startedAt :  > x min))
> 7. The leader monitor(on the leader broker) manages orphan jobs if brokers
> are gone or unavailable.
> 
> We can have multiple types of transaction locks(or generic lock) depending
> on the operations types. This will reduce the number of locks to
> create/update if there are multiple target resources to operate on for a
> single transaction.
> 
> - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id
> - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers :
> {ledger_ids[a,b,c,d], last_deleted_ledger_index:3}
> //last_deleted_ledger_index could be periodically updated every min. This
> can help to resume the deletion when retrying.
> - Topic deletion : /transactions/broker-id/delete_topic/topic_name
> 
> 
> 
> > > - How does Pulsar deduplicate deletion requests(error out to users) while
> > > the deletion request is running?
> > The user only can invoke `truncateTopic`, it's not for a particular
> > ledger. The note: "The truncate operation will move all cursors to the end
> > of the topic and delete all inactive ledgers."
> > It's just a trigger for the user.
> >
> 
> What if the admin concurrently requests `truncateTopic` many times for the
> same topic while one truncation job is running? How does Pulsar currently
> deduplicate these requests? And how does this proposal handle this
> situation?
> 
> 
> >
> > > - How do users track async deletion flow status? (do we expose any
> > > describeDeletion API to show the deletion status?)
> > Why need to track the async deletion flow status? The ledger deletion is
> > transparent for pulsarClient. In the broker, deleting a ledger will print
> > the log `delete ledger xx successfully `.
> > If delete failed, it print the log `delete ledger xxx failed.`
> >
> 
> IMHO, relying on logs to check the system state is not a good practice.
> Generally, every async user/admin API(long-running async workflow API)
> needs the corresponding describe* API to return the current running state.
> 
> 
> Regards,
> Heesung
> 


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-03 Thread Yan Zhao
> If you persisted the message successfully by the producer and the broker
> was terminated before being able to delete the ledger from the metadata
> service?
If the broker is terminated, the consumer won't ack the message, the message 
will be re-consume later. 

> I recommend having the logic to delete the ledger be done in the message
> consumer side:
> - if the ledger exists in the MD store, delete it.
> - send delete command to BK
> Both as I understand are idempotent. If for some reason one action was done
> but the other wasn't - ZK down, BK down, broker hosting the consumer
> terminated - either the message will not be acknowledged, or you negatively
> acknowledge.
We send a delete command to the broker, it will connect to the corresponding 
broker which loads the topic. The corresponding broker received the command, 
then passes the command to ManagedLedger, the ManagedLedger does the actual 
delete operation.
If the consumer does the delete operation, it's a little unreasonable. The 
ledger manager should be `ManagedLedger`, let it do the delete will be better.

> General question: When a ledger is persisted to ZK, where is the ledger
> metadata persisted in ZK (more specifically it's metadata, which includes
> the component?
> Is it also used when building out the key (path) in ZK?

https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74

It's the content in the zk node. when creating a ledger by bookkeeper, it will 
create a path like `/ledgers/00//L`, the path value is an instance of 
LedgerMetadataImpl.
The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's 
metadata.

If the ledger is for Ledger, the customMetadata store:
`key:application, value:pulsar`
`key:component, value:managed-ledger`
`key:pulsar/managed-ledger, value: ledgerName`

If the ledger is for Cursor, the customMetadata store:
`key:application, value:pulsar`
`key:component, value:managed-ledger`
`key:pulsar/managed-ledger, value: ledgerName`
`key:pulsar/cursor, value: curSorName`

If the ledger is for schema, the customMetadata store:
`key:application, value:pulsar`
`key:component, value:schema`
`key:pulsar/schemaId, value: schemaId`

So when we get the ledger metadata from bookkeeper, we can get the ledger 
source.

> Isn't the type saved together with the ledger in ZK?
We need to differ it, the same ledger may store both on the bk side and the 
offload side.  
If a ledger want to delete the bk data and the offload data, it should 
publishes two message to the system topic. The broker needs it to determine 
whether to delete offload or bk. 

> It's for the offloaded ledger, when we want to delete the offload ledger,
> > we need offloadedContextUuid, here we can simplify it to offloadContextUuid.

> Sounds much better. Maybe offloadedLedgerUUID? (why context?)
Agree.

> 
>  Are you encoding all that  extra info besides the ledger ID and its source
> to avoid reading it again from ZK when deleting it?
No, only encoding the data which is useful for deletion.

> 
> It's for extended.
> >
> Can't really understand from that short sentence what you mean. Can you
> please elaborate?
Maybe we can delete it, I just want to didn't change the class when we want to 
add new property. Put the new property as key-value to extend.

> 
> > In https://github.com/apache/pulsar/issues/16569. The first step section
> > and second step section process flow picture are detailed.
> >
>  I'm sorry but you didn't answer all the questions I wrote. I'll paste them
> here:
> Can you explain the starting point? How does deletion work in general?
> > When? What happens? ... I understand there are time based triggers, and
> > sometimes used based triggers. They are somehow marked in metadata.
> 
In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will trigger 
the delete operation. It will find the slowest cursor read position in the 
topic, and find the ledger which is before the slowest cursor read position. 
Then check the ledgers if `isLedgerRetentionOverSizeQuota` or 
`hasLedgerRetentionExpired`. If so, we think the ledger should be delete, 
append it to deleteLedgerList, we also check if the ledger 
`isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build 
`PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I will 
remove the ledger from the ledgerList, then persist the ledgerList. If send 
failed, didn't do anything.
Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we send 
1,3 to the system topic succeed, send 2 failed. We remove 1,3. And persist 
[2,4,5] to the zk metadata store. 

There are some cases to trigger it.
1. A cursor be removed.
2. Close the current ledger and create a new ledger.
3. Consumer ack the message, the slowest cursor move forward.
4. User trigger 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-02 Thread Heesung Sohn
On Thu, Feb 2, 2023 at 6:42 AM Asaf Mesika  wrote:

> Heesung, there are users out there which delete 2M topics in a day. The
> stress we apply on ZK today due to the amount of topics is more than
> enough. I don't think we should rely on ZK to also manage the deletion of
> those ledgers if that means adding more nodes / more reads and writes to
> it.

I totally agree that we need to know if there is an in flight request. I
> think this can be achieved using a special topic. I mean, why can't we just
> mark in ZK itself the ledger is marked for delete, and the workflow will go
> through the different stages until all needed data is deleted and only then
> delete the ledger ID from ZK?
>

If we don't want to add more pressure on the metadata store from this
feature as a requirement,
I think we can use a system topic as well. Just to confirm, is this the
direction we are agreeing on?

Using a system topic,
1. A broker receives an async operation request on a target resource from a
client.
2. If the target resource is available, the broker publishes an operation
request message to the system topic.
   2.1 If there is no target resource,
error out(ResourceDoesNotExistException).
   2.2 If the target resource's state is pending or running, error
out(InvalidResourceStateException).
3. The broker returns success(messageId) to the client.
4. A system topic consumer(Shared Subscription Mode?) consumes the
operation request message.
5. The consumer runs the operation workflow with an x-min timeout.
 5.1 The consumer gets the target resource data from the metadata.
5.1.1 If there is no targeted resource, complete the operation. Go
to 5.4.
 5.2 The consumer updates the target resource state to running. (state:
running, startedAt: now)
 5.3 Run the workflow step 1 ... n.
 5.4 Acknowledge the message.


* I guess we will enable deduplication for the system topic for possibly
duplicated operation requests.

* Here, Target Resource means the last metadata to update in the metadata
store.
For example, a workflow updates multiple metadata places like the
following. In this case, the target resource is the 3rd one.
1. sub-resource-1
2. sub-resource-2
3. target-resource. e.g. ledger, topic metadata.

* For Step 5.3, all steps should be idempotent. Also, the consumer
periodically updates the operation state in the target resource's metadata
every x secs. Any retry on a long-running job should resume the operation
closer to where it failed instead of running it from the beginning.

* We could have describe* apis to check any inflight operations state by
querying the target resource's operation state, but this will show some
eventual consistency because of the delay between step 3 and step 5.


Thanks,
Heesung




>
> On Tue, Jan 31, 2023 at 9:06 PM Heesung Sohn
>  wrote:
>
> > On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao  wrote:
> >
> > > > - Have we considered a metadata store to persist and dedup deletion
> > > > requests instead of the system topic? Why is the system topic the
> > better
> > > > choice than a metadata store for this problem?
> > > If we use the metadata store to store the middle step ledger, we need
> to
> > > operate the metadata store after deletion every time.
> >
> >
> >
> > >
> > > And we need a trigger to trigger deletion. In the broker, it may have
> > lots
> > > of topics, the ledger deletion is also much. Using the metadata store
> to
> > > store it may be a bottleneck.
> > > Using pub/sub is easy to implement, and it is a good trigger to trigger
> > > deletion.
> > >
> >
> >
> > We can group the multiple resource deletions to a single record in the
> > metadata store. Also, we can use the metadata store watcher to trigger
> the
> > deletion.
> >
> > I can see that a similar transactional operation(using metadata store)
> can
> > be done like the following.
> >
> > Alternatively,
> > 1. A broker receives a resource(ledger) deletion request from a client.
> > 2. If the target resource is available, the broker persists a transaction
> > lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata
> > store(state:pending, createdAt:now).
> >   2.1 If there is no target resource, error
> > out(ResourceDoesNotExistException).
> >   2.2 If the lock already exists, error
> out(OperationInProgressExeception).
> > 3. The broker returns success to the client.
> > 4. The transaction watcher(metadata store listener) on the same broker-id
> > is notified.
> > 5. The transaction watcher runs the deletion process with an x min
> timeout.
> > 5.1 The transaction watcher updates the lock state (state: running,
> > startedAt: now)
> > 5.2 Run step 1 ... n (periodically update the lock state and
> > updatedAt:now every x secs)
> > 5.3 Delete the lock.
> > 6. The orphan transaction monitor runs any orphan jobs by retrying step
> 5.
> > (If the watcher fails in the middle at step 5, the lock state will be
> > orphan(state:running and startedAt :  > x min))
> > 7. The leader 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-02 Thread Asaf Mesika
Heesung, there are users out there which delete 2M topics in a day. The
stress we apply on ZK today due to the amount of topics is more than
enough. I don't think we should rely on ZK to also manage the deletion of
those ledgers if that means adding more nodes / more reads and writes to it.
I totally agree that we need to know if there is an in flight request. I
think this can be achieved using a special topic. I mean, why can't we just
mark in ZK itself the ledger is marked for delete, and the workflow will go
through the different stages until all needed data is deleted and only then
delete the ledger ID from ZK?



On Tue, Jan 31, 2023 at 9:06 PM Heesung Sohn
 wrote:

> On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao  wrote:
>
> > > - Have we considered a metadata store to persist and dedup deletion
> > > requests instead of the system topic? Why is the system topic the
> better
> > > choice than a metadata store for this problem?
> > If we use the metadata store to store the middle step ledger, we need to
> > operate the metadata store after deletion every time.
>
>
>
> >
> > And we need a trigger to trigger deletion. In the broker, it may have
> lots
> > of topics, the ledger deletion is also much. Using the metadata store to
> > store it may be a bottleneck.
> > Using pub/sub is easy to implement, and it is a good trigger to trigger
> > deletion.
> >
>
>
> We can group the multiple resource deletions to a single record in the
> metadata store. Also, we can use the metadata store watcher to trigger the
> deletion.
>
> I can see that a similar transactional operation(using metadata store) can
> be done like the following.
>
> Alternatively,
> 1. A broker receives a resource(ledger) deletion request from a client.
> 2. If the target resource is available, the broker persists a transaction
> lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata
> store(state:pending, createdAt:now).
>   2.1 If there is no target resource, error
> out(ResourceDoesNotExistException).
>   2.2 If the lock already exists, error out(OperationInProgressExeception).
> 3. The broker returns success to the client.
> 4. The transaction watcher(metadata store listener) on the same broker-id
> is notified.
> 5. The transaction watcher runs the deletion process with an x min timeout.
> 5.1 The transaction watcher updates the lock state (state: running,
> startedAt: now)
> 5.2 Run step 1 ... n (periodically update the lock state and
> updatedAt:now every x secs)
> 5.3 Delete the lock.
> 6. The orphan transaction monitor runs any orphan jobs by retrying step 5.
> (If the watcher fails in the middle at step 5, the lock state will be
> orphan(state:running and startedAt :  > x min))
> 7. The leader monitor(on the leader broker) manages orphan jobs if brokers
> are gone or unavailable.
>
> We can have multiple types of transaction locks(or generic lock) depending
> on the operations types. This will reduce the number of locks to
> create/update if there are multiple target resources to operate on for a
> single transaction.
>
> - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id
> - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers :
> {ledger_ids[a,b,c,d], last_deleted_ledger_index:3}
> //last_deleted_ledger_index could be periodically updated every min. This
> can help to resume the deletion when retrying.
> - Topic deletion : /transactions/broker-id/delete_topic/topic_name
>
>
>
> > > - How does Pulsar deduplicate deletion requests(error out to users)
> while
> > > the deletion request is running?
> > The user only can invoke `truncateTopic`, it's not for a particular
> > ledger. The note: "The truncate operation will move all cursors to the
> end
> > of the topic and delete all inactive ledgers."
> > It's just a trigger for the user.
> >
>
> What if the admin concurrently requests `truncateTopic` many times for the
> same topic while one truncation job is running? How does Pulsar currently
> deduplicate these requests? And how does this proposal handle this
> situation?
>
>
> >
> > > - How do users track async deletion flow status? (do we expose any
> > > describeDeletion API to show the deletion status?)
> > Why need to track the async deletion flow status? The ledger deletion is
> > transparent for pulsarClient. In the broker, deleting a ledger will print
> > the log `delete ledger xx successfully `.
> > If delete failed, it print the log `delete ledger xxx failed.`
> >
>
> IMHO, relying on logs to check the system state is not a good practice.
> Generally, every async user/admin API(long-running async workflow API)
> needs the corresponding describe* API to return the current running state.
>
>
> Regards,
> Heesung
>


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-02 Thread Asaf Mesika
Heesung points out really good points, which actually made me add one more:

Can you please describe all the triggers for ledger deletion? Some are user
facing, some not.


On Mon, Jan 30, 2023 at 8:19 PM Heesung Sohn
 wrote:

> Hi,
>
>
> I assume the deletion APIs are async(when a user requests deletion, pulsar
> first returns success to the user if the request is persisted. Then, Pulsar
> asynchronously runs the deletion flow)
>
> - Have we considered a metadata store to persist and dedup deletion
> requests instead of the system topic? Why is the system topic the better
> choice than a metadata store for this problem?
>
> - How does Pulsar deduplicate deletion requests(error out to users) while
> the deletion request is running?
>
> - How do users track async deletion flow status? (do we expose any
> describeDeletion API to show the deletion status?)
>
>
> Thanks,
> Heesung
>
>
>
>
> On Mon, Jan 30, 2023 at 6:10 AM Yan Zhao  wrote:
>
> > > Couples of notes:
> > >
> > > 1.
> > >
> > > > In the LedgerDeletionService  start, it will create  a producer to
> send
> > > > pending delete ledger.
> > > > When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay
> > (the
> > > > delay is for consumer side, if send it immediately, maybe the
> metadata
> > > > din't change when consumer receive it). After the send operation
> > succeeds,
> > > >  then to operate metadata. If send msg failed, we think this deletion
> > > > operation failed, and didn't operate metadata.
> > >
> > >
> > > This section is completely unclear to me. Can you please provide step
> by
> > > step exactly what will happen in the workflow?
> > > Who does what and where (node)?
> >
> > In PulsarService, it defines ledgerDeletionService. The
> > ledgerDeletionService will create a producer and a consumer, which looks
> > like topicPoliciesService. The PulsarService passes it to ManagedLedger.
> > When pulsar wants to delete a ledger, ManagedLedger uses
> > ledgerDeletionService to send a message, the message content contains the
> > waiting delete ledger info. After sending success, delete the ledger id
> > from the metadata store.
> > The consumer receives the message, it will use PulsarClient to send a
> > delete command to the corresponding broker, the broker receives delete
> > command, and do the actual delete operation.
> >
> > In https://github.com/apache/pulsar/issues/16569, these are some
> pictures
> > for the workflow.
> >
> >
> > >
> > > 2.
> > >
> > > > /**
> > > >  * The ledger component . managed-ledger, managed-cursor and
> > > > schema-storage.
> > > >  */
> > > > private LedgerComponent ledgerComponent;
> > >
> > >
> > > Why is this needed?
> > > What do you mean by a component of a ledger? Is the ledger divided into
> > > components?
> >
> > It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE)
> > When the broker wants to delete a ledger, we will check if the bookkeeper
> > metadata matches or not. In the pulsar, it will mark the ledger source
> when
> > creating a new ledger. See LedgerMetadataUtils.
> >
> > >
> > > 3.
> > >
> > > >   /**
> > > >  * The ledger type. ledger or offload-ledger.
> > > >  */
> > > > private LedgerType ledgerType;
> > >
> > >
> > > I don't understand why you need this type.
> > It marks the ledger as a normal ledger or an offload ledger, broker need
> > it to determine whether to delete bookkeeper data or offload data.
> >
> > >
> > > 4.
> > >
> > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
> > >
> > >
> > > Context is a very generic word, but your type is so specific. Can you
> > > please explain why you need this for?
> > >
> > > Are you sure you want to tie one data structure into the other - just
> > > validating.
> > It's for the offloaded ledger, when we want to delete the offload ledger,
> > we need offloadedContextUuid, here we can simplify it to
> offloadContextUuid.
> >
> > >
> > > 5.
> > >
> > > >   /**
> > > >  * Extent properties.
> > > >  */
> > > > private Map properties = new HashMap<>();
> > >
> > >
> > > Why is this needed?
> > It's for extended.
> >
> > >
> > >
> > > 6.
> > >
> > > > When receiving a pending delete ledger msg, we will check if the
> topic
> > > > still exists. If the topic exists, send a delete command
> > > > (PendingDelteLedger) to the broker which owns the topic. In the
> > broker, it
> > > > will check if the ledger is still in the metadata store, if the
> ledger
> > in
> > > > the metadata store means the ledger is still in use, give up to
> delete
> > this
> > > > ledge
> > >
> > >
> > > I don't understand this workflow. You say you check if it's in the
> > metadata
> > > store, and if it is , then it is used - what will make it unused?
> > > Can you explain the starting point? How does deletion work in general?
> > > When? What happens? ... I understand there are time based triggers, and
> > > sometimes used based triggers. They are somehow marked in metadata.
> > In 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-02-02 Thread Asaf Mesika
>
> When pulsar wants to delete a ledger, ManagedLedger uses
> ledgerDeletionService to send a message, the message content contains the
> waiting delete ledger info. After sending success, delete the ledger id
> from the metadata store.
> The consumer receives the message, it will use PulsarClient to send a
> delete command to the corresponding broker, the broker receives delete
> command, and do the actual delete operation.
>

If you persisted the message successfully by the producer and the broker
was terminated before being able to delete the ledger from the metadata
service?
I recommend having the logic to delete the ledger be done in the message
consumer side:
- if the ledger exists in the MD store, delete it.
- send delete command to BK
Both as I understand are idempotent. If for some reason one action was done
but the other wasn't - ZK down, BK down, broker hosting the consumer
terminated - either the message will not be acknowledged, or you negatively
acknowledge.

It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE)
> When the broker wants to delete a ledger, we will check if the bookkeeper
> metadata matches or not. In the pulsar, it will mark the ledger source when
> creating a new ledger. See LedgerMetadataUtils.


General question: When a ledger is persisted to ZK, where is the ledger
metadata persisted in ZK (more specifically it's metadata, which includes
the component?
Is it also used when building out the key (path) in ZK?

It marks the ledger as a normal ledger or an offload ledger, broker need it
> to determine whether to delete bookkeeper data or offload data.
>
Isn't the type saved together with the ledger in ZK?

It's for the offloaded ledger, when we want to delete the offload ledger,
> we need offloadedContextUuid, here we can simplify it to offloadContextUuid.

Sounds much better. Maybe offloadedLedgerUUID? (why context?)

 Are you encoding all that  extra info besides the ledger ID and its source
to avoid reading it again from ZK when deleting it?

It's for extended.
>
Can't really understand from that short sentence what you mean. Can you
please elaborate?


> In https://github.com/apache/pulsar/issues/16569. The first step section
> and second step section process flow picture are detailed.
>
 I'm sorry but you didn't answer all the questions I wrote. I'll paste them
here:
Can you explain the starting point? How does deletion work in general?
> When? What happens? ... I understand there are time based triggers, and
> sometimes used based triggers. They are somehow marked in metadata.

If delete fails, that means the storage system occur some problems. I guess
> the storage system will recovery in 10 mins.
>
> In https://github.com/apache/pulsar/issues/16569, we define
> reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the ServiceConfiguration,
> it's configurable.
> private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600;


We need some experienced people here to contribute their opinion. Default
10min might be too much. I recommend you ask Penghui.

>
> > You mentioned you are working with a client which has retries configured.
> > Retry is client side based, ack one message while producing another,
> > transaction free. Are you prepared to handle a case where you acked but
> > failed to produce the message, hence you lost it completely?
> >
> The pulsarClient only sends a new message that succeeds, then ack the
> origin message, so didn't care in this case.
>
Ok, then you will have concurrent consumption of a message which tries to
delete the ledger from ZK and then tries to delete it from BK? Isn't that a
concurrency issue?


> > > If we want to delete a topic, we should send the delete ledger msg to
> > > system topic and remove ledger id from metadata one by one, after all
> the
> > > ledger has been deleted, then delete the topic. If any ledger operate
> > > failed, we think this delete topic operation failed and return the left
> > > ledger id in the response.
> >
> > I couldn't understand. Can you please clarify this section. How exactly
> > topic deletion is modified to use the features described in this pip?
> >
> We need to ensure that all ledgers are deleted before the topic is
> deleted, otherwise, there will be orphan ledgers.
>
Your PIP is about introducing a workflow for deleting a ledger, right?
When you delete a topic you iterate its ledger list and delete each ledger.
Your PIP changes the way each ledger is deleted and makes it async. So I
guess what I want to understand is: What are the changes you are making to
topic deletion due to your PIP? You said "we need to make sure" - can you
please clarify how you will make sure?

> 10.
> > Backward compatibility - I would make sure to document in the docs
> exactly
> > what to do in case we need to rollback (cook book).
> Well.


You added

> If user upgrade and enable two phase deletion, the ledger deletion msg
> will store in system topic. If the user rolls back to the old version 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-31 Thread Heesung Sohn
On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao  wrote:

> > - Have we considered a metadata store to persist and dedup deletion
> > requests instead of the system topic? Why is the system topic the better
> > choice than a metadata store for this problem?
> If we use the metadata store to store the middle step ledger, we need to
> operate the metadata store after deletion every time.



>
> And we need a trigger to trigger deletion. In the broker, it may have lots
> of topics, the ledger deletion is also much. Using the metadata store to
> store it may be a bottleneck.
> Using pub/sub is easy to implement, and it is a good trigger to trigger
> deletion.
>


We can group the multiple resource deletions to a single record in the
metadata store. Also, we can use the metadata store watcher to trigger the
deletion.

I can see that a similar transactional operation(using metadata store) can
be done like the following.

Alternatively,
1. A broker receives a resource(ledger) deletion request from a client.
2. If the target resource is available, the broker persists a transaction
lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata
store(state:pending, createdAt:now).
  2.1 If there is no target resource, error
out(ResourceDoesNotExistException).
  2.2 If the lock already exists, error out(OperationInProgressExeception).
3. The broker returns success to the client.
4. The transaction watcher(metadata store listener) on the same broker-id
is notified.
5. The transaction watcher runs the deletion process with an x min timeout.
5.1 The transaction watcher updates the lock state (state: running,
startedAt: now)
5.2 Run step 1 ... n (periodically update the lock state and
updatedAt:now every x secs)
5.3 Delete the lock.
6. The orphan transaction monitor runs any orphan jobs by retrying step 5.
(If the watcher fails in the middle at step 5, the lock state will be
orphan(state:running and startedAt :  > x min))
7. The leader monitor(on the leader broker) manages orphan jobs if brokers
are gone or unavailable.

We can have multiple types of transaction locks(or generic lock) depending
on the operations types. This will reduce the number of locks to
create/update if there are multiple target resources to operate on for a
single transaction.

- Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id
- Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers :
{ledger_ids[a,b,c,d], last_deleted_ledger_index:3}
//last_deleted_ledger_index could be periodically updated every min. This
can help to resume the deletion when retrying.
- Topic deletion : /transactions/broker-id/delete_topic/topic_name



> > - How does Pulsar deduplicate deletion requests(error out to users) while
> > the deletion request is running?
> The user only can invoke `truncateTopic`, it's not for a particular
> ledger. The note: "The truncate operation will move all cursors to the end
> of the topic and delete all inactive ledgers."
> It's just a trigger for the user.
>

What if the admin concurrently requests `truncateTopic` many times for the
same topic while one truncation job is running? How does Pulsar currently
deduplicate these requests? And how does this proposal handle this
situation?


>
> > - How do users track async deletion flow status? (do we expose any
> > describeDeletion API to show the deletion status?)
> Why need to track the async deletion flow status? The ledger deletion is
> transparent for pulsarClient. In the broker, deleting a ledger will print
> the log `delete ledger xx successfully `.
> If delete failed, it print the log `delete ledger xxx failed.`
>

IMHO, relying on logs to check the system state is not a good practice.
Generally, every async user/admin API(long-running async workflow API)
needs the corresponding describe* API to return the current running state.


Regards,
Heesung


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-31 Thread Yan Zhao
> - Have we considered a metadata store to persist and dedup deletion
> requests instead of the system topic? Why is the system topic the better
> choice than a metadata store for this problem?
If we use the metadata store to store the middle step ledger, we need to 
operate the metadata store after deletion every time. 
And we need a trigger to trigger deletion. In the broker, it may have lots of 
topics, the ledger deletion is also much. Using the metadata store to store it 
may be a bottleneck.
Using pub/sub is easy to implement, and it is a good trigger to trigger 
deletion. 

> - How does Pulsar deduplicate deletion requests(error out to users) while
> the deletion request is running?
The user only can invoke `truncateTopic`, it's not for a particular ledger. The 
note: "The truncate operation will move all cursors to the end of the topic and 
delete all inactive ledgers."
It's just a trigger for the user.

> - How do users track async deletion flow status? (do we expose any
> describeDeletion API to show the deletion status?)
Why need to track the async deletion flow status? The ledger deletion is 
transparent for pulsarClient. In the broker, deleting a ledger will print the 
log `delete ledger xx successfully `.
If delete failed, it print the log `delete ledger xxx failed.`


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-30 Thread Heesung Sohn
Hi,


I assume the deletion APIs are async(when a user requests deletion, pulsar
first returns success to the user if the request is persisted. Then, Pulsar
asynchronously runs the deletion flow)

- Have we considered a metadata store to persist and dedup deletion
requests instead of the system topic? Why is the system topic the better
choice than a metadata store for this problem?

- How does Pulsar deduplicate deletion requests(error out to users) while
the deletion request is running?

- How do users track async deletion flow status? (do we expose any
describeDeletion API to show the deletion status?)


Thanks,
Heesung




On Mon, Jan 30, 2023 at 6:10 AM Yan Zhao  wrote:

> > Couples of notes:
> >
> > 1.
> >
> > > In the LedgerDeletionService  start, it will create  a producer to send
> > > pending delete ledger.
> > > When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay
> (the
> > > delay is for consumer side, if send it immediately, maybe the metadata
> > > din't change when consumer receive it). After the send operation
> succeeds,
> > >  then to operate metadata. If send msg failed, we think this deletion
> > > operation failed, and didn't operate metadata.
> >
> >
> > This section is completely unclear to me. Can you please provide step by
> > step exactly what will happen in the workflow?
> > Who does what and where (node)?
>
> In PulsarService, it defines ledgerDeletionService. The
> ledgerDeletionService will create a producer and a consumer, which looks
> like topicPoliciesService. The PulsarService passes it to ManagedLedger.
> When pulsar wants to delete a ledger, ManagedLedger uses
> ledgerDeletionService to send a message, the message content contains the
> waiting delete ledger info. After sending success, delete the ledger id
> from the metadata store.
> The consumer receives the message, it will use PulsarClient to send a
> delete command to the corresponding broker, the broker receives delete
> command, and do the actual delete operation.
>
> In https://github.com/apache/pulsar/issues/16569, these are some pictures
> for the workflow.
>
>
> >
> > 2.
> >
> > > /**
> > >  * The ledger component . managed-ledger, managed-cursor and
> > > schema-storage.
> > >  */
> > > private LedgerComponent ledgerComponent;
> >
> >
> > Why is this needed?
> > What do you mean by a component of a ledger? Is the ledger divided into
> > components?
>
> It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE)
> When the broker wants to delete a ledger, we will check if the bookkeeper
> metadata matches or not. In the pulsar, it will mark the ledger source when
> creating a new ledger. See LedgerMetadataUtils.
>
> >
> > 3.
> >
> > >   /**
> > >  * The ledger type. ledger or offload-ledger.
> > >  */
> > > private LedgerType ledgerType;
> >
> >
> > I don't understand why you need this type.
> It marks the ledger as a normal ledger or an offload ledger, broker need
> it to determine whether to delete bookkeeper data or offload data.
>
> >
> > 4.
> >
> > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
> >
> >
> > Context is a very generic word, but your type is so specific. Can you
> > please explain why you need this for?
> >
> > Are you sure you want to tie one data structure into the other - just
> > validating.
> It's for the offloaded ledger, when we want to delete the offload ledger,
> we need offloadedContextUuid, here we can simplify it to offloadContextUuid.
>
> >
> > 5.
> >
> > >   /**
> > >  * Extent properties.
> > >  */
> > > private Map properties = new HashMap<>();
> >
> >
> > Why is this needed?
> It's for extended.
>
> >
> >
> > 6.
> >
> > > When receiving a pending delete ledger msg, we will check if the topic
> > > still exists. If the topic exists, send a delete command
> > > (PendingDelteLedger) to the broker which owns the topic. In the
> broker, it
> > > will check if the ledger is still in the metadata store, if the ledger
> in
> > > the metadata store means the ledger is still in use, give up to delete
> this
> > > ledge
> >
> >
> > I don't understand this workflow. You say you check if it's in the
> metadata
> > store, and if it is , then it is used - what will make it unused?
> > Can you explain the starting point? How does deletion work in general?
> > When? What happens? ... I understand there are time based triggers, and
> > sometimes used based triggers. They are somehow marked in metadata.
> In https://github.com/apache/pulsar/issues/16569. The first step section
> and second step section process flow picture are detailed.
>
> >
> > 7.
> >
> > If we delete successfully, the consumer will ack this msg. If delete
> fails,
> > > reconsume this msg 10 min later.
> >
> >
> > Where did you define 10min?
> > Why 10 min?
> If delete fails, that means the storage system occur some problems. I
> guess the storage system will recovery in 10 mins.
>
> In https://github.com/apache/pulsar/issues/16569, we define
> 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-30 Thread Yan Zhao
> Couples of notes:
> 
> 1.
> 
> > In the LedgerDeletionService  start, it will create  a producer to send
> > pending delete ledger.
> > When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
> > delay is for consumer side, if send it immediately, maybe the metadata
> > din't change when consumer receive it). After the send operation succeeds,
> >  then to operate metadata. If send msg failed, we think this deletion
> > operation failed, and didn't operate metadata.
> 
> 
> This section is completely unclear to me. Can you please provide step by
> step exactly what will happen in the workflow?
> Who does what and where (node)?

In PulsarService, it defines ledgerDeletionService. The ledgerDeletionService 
will create a producer and a consumer, which looks like topicPoliciesService. 
The PulsarService passes it to ManagedLedger. 
When pulsar wants to delete a ledger, ManagedLedger uses ledgerDeletionService 
to send a message, the message content contains the waiting delete ledger info. 
After sending success, delete the ledger id from the metadata store. 
The consumer receives the message, it will use PulsarClient to send a delete 
command to the corresponding broker, the broker receives delete command, and do 
the actual delete operation.

In https://github.com/apache/pulsar/issues/16569, these are some pictures for 
the workflow.


> 
> 2.
> 
> > /**
> >  * The ledger component . managed-ledger, managed-cursor and
> > schema-storage.
> >  */
> > private LedgerComponent ledgerComponent;
> 
> 
> Why is this needed?
> What do you mean by a component of a ledger? Is the ledger divided into
> components?

It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE)
When the broker wants to delete a ledger, we will check if the bookkeeper 
metadata matches or not. In the pulsar, it will mark the ledger source when 
creating a new ledger. See LedgerMetadataUtils.

> 
> 3.
> 
> >   /**
> >  * The ledger type. ledger or offload-ledger.
> >  */
> > private LedgerType ledgerType;
> 
> 
> I don't understand why you need this type.
It marks the ledger as a normal ledger or an offload ledger, broker need it to 
determine whether to delete bookkeeper data or offload data.

> 
> 4.
> 
> > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
> 
> 
> Context is a very generic word, but your type is so specific. Can you
> please explain why you need this for?
> 
> Are you sure you want to tie one data structure into the other - just
> validating.
It's for the offloaded ledger, when we want to delete the offload ledger, we 
need offloadedContextUuid, here we can simplify it to offloadContextUuid.

> 
> 5.
> 
> >   /**
> >  * Extent properties.
> >  */
> > private Map properties = new HashMap<>();
> 
> 
> Why is this needed?
It's for extended.

> 
> 
> 6.
> 
> > When receiving a pending delete ledger msg, we will check if the topic
> > still exists. If the topic exists, send a delete command
> > (PendingDelteLedger) to the broker which owns the topic. In the broker, it
> > will check if the ledger is still in the metadata store, if the ledger in
> > the metadata store means the ledger is still in use, give up to delete this
> > ledge
> 
> 
> I don't understand this workflow. You say you check if it's in the metadata
> store, and if it is , then it is used - what will make it unused?
> Can you explain the starting point? How does deletion work in general?
> When? What happens? ... I understand there are time based triggers, and
> sometimes used based triggers. They are somehow marked in metadata.
In https://github.com/apache/pulsar/issues/16569. The first step section and 
second step section process flow picture are detailed.

> 
> 7.
> 
> If we delete successfully, the consumer will ack this msg. If delete fails,
> > reconsume this msg 10 min later.
> 
> 
> Where did you define 10min?
> Why 10 min?
If delete fails, that means the storage system occur some problems. I guess the 
storage system will recovery in 10 mins.  

In https://github.com/apache/pulsar/issues/16569, we define 
reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the ServiceConfiguration, 
it's configurable.
private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600;

> 
> You mentioned you are working with a client which has retries configured.
> Retry is client side based, ack one message while producing another,
> transaction free. Are you prepared to handle a case where you acked but
> failed to produce the message, hence you lost it completely?
> 
The pulsarClient only sends a new message that succeeds, then ack the origin 
message, so didn't care in this case.

> 8.
> 
> > If we want to delete a topic, we should send the delete ledger msg to
> > system topic and remove ledger id from metadata one by one, after all the
> > ledger has been deleted, then delete the topic. If any ledger operate
> > failed, we think this delete topic operation failed and return the left
> > 

Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-29 Thread Asaf Mesika
Couples of notes:

1.

> In the LedgerDeletionService  start, it will create  a producer to send
> pending delete ledger.
> When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
> delay is for consumer side, if send it immediately, maybe the metadata
> din't change when consumer receive it). After the send operation succeeds,
>  then to operate metadata. If send msg failed, we think this deletion
> operation failed, and didn't operate metadata.


This section is completely unclear to me. Can you please provide step by
step exactly what will happen in the workflow?
Who does what and where (node)?

2.

> /**
>  * The ledger component . managed-ledger, managed-cursor and
> schema-storage.
>  */
> private LedgerComponent ledgerComponent;


Why is this needed?
What do you mean by a component of a ledger? Is the ledger divided into
components?

3.

>   /**
>  * The ledger type. ledger or offload-ledger.
>  */
> private LedgerType ledgerType;


I don't understand why you need this type.

4.

> private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;


Context is a very generic word, but your type is so specific. Can you
please explain why you need this for?

Are you sure you want to tie one data structure into the other - just
validating.

5.

>   /**
>  * Extent properties.
>  */
> private Map properties = new HashMap<>();


Why is this needed?


6.

> When receiving a pending delete ledger msg, we will check if the topic
> still exists. If the topic exists, send a delete command
> (PendingDelteLedger) to the broker which owns the topic. In the broker, it
> will check if the ledger is still in the metadata store, if the ledger in
> the metadata store means the ledger is still in use, give up to delete this
> ledge


I don't understand this workflow. You say you check if it's in the metadata
store, and if it is , then it is used - what will make it unused?
Can you explain the starting point? How does deletion work in general?
When? What happens? ... I understand there are time based triggers, and
sometimes used based triggers. They are somehow marked in metadata.

7.

If we delete successfully, the consumer will ack this msg. If delete fails,
> reconsume this msg 10 min later.


Where did you define 10min?
Why 10 min?

You mentioned you are working with a client which has retries configured.
Retry is client side based, ack one message while producing another,
transaction free. Are you prepared to handle a case where you acked but
failed to produce the message, hence you lost it completely?

8.

> If we want to delete a topic, we should send the delete ledger msg to
> system topic and remove ledger id from metadata one by one, after all the
> ledger has been deleted, then delete the topic. If any ledger operate
> failed, we think this delete topic operation failed and return the left
> ledger id in the response.

I couldn't understand. Can you please clarify this section. How exactly
topic deletion is modified to use the features described in this pip?

9.

Regarding configuration. I suggest we prefix all config keys with the
feature name so we can easily separate them.

10.
Backward compatibility - I would make sure to document in the docs exactly
what to do in case we need to rollback (cook book).

11.
General comment - You're basically implementing a bespoke workflow using a
topic to save the state of where you are in the topic.
Is this the only place in Pulsar (delete ledger) that an action is composed
of several steps ?
If the answer is no to this, wouldn't it be better to have a small utility
which is in charge of moving through the workflow steps? It can even be a
simple state enum, where you move your state from a to b to c to d and it
is persisted.

12. Monitoring
Some actions here can take a long time. We're basically relying on logs to
monitor where we are in the flow?










On Sun, Jan 29, 2023 at 4:44 AM Yan Zhao  wrote:

> We need more eyes and votes. Thanks.
>


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2023-01-28 Thread Yan Zhao
We need more eyes and votes. Thanks.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-08-14 Thread Yan Zhao
Good idea.

> Good idea. I suggest the naming endings with `Seconds` like
> sendDelayOfTwoPhaseDeletionInSeconds,`
> reconsumeLaterOfTwoPhaseDeletionInSeconds`.
> >private int sendDelaySecondsOfTwoPhaseDeletion = 60;
> >private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-08-13 Thread Qiang Huang
Good idea. I suggest the naming endings with `Seconds` like
sendDelayOfTwoPhaseDeletionInSeconds,`
reconsumeLaterOfTwoPhaseDeletionInSeconds`.
>private int sendDelaySecondsOfTwoPhaseDeletion = 60;
>private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;

Yan Zhao  于2022年8月12日周五 10:45写道:

> > I suggest to include 'topic' in the flag,  we have too many entities in
> > Pulsar
>
> Thanks, change it to `topicTwoPhaseDeletionEnabled`.
>


-- 
BR,
Qiang Huang


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-08-11 Thread Yan Zhao
> I suggest to include 'topic' in the flag,  we have too many entities in
> Pulsar

Thanks, change it to `topicTwoPhaseDeletionEnabled`.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-08-11 Thread Enrico Olivelli
Il Sab 23 Lug 2022, 03:26 Yan Zhao  ha scritto:

> Yes, wo define `twoPhaseDeletionEnabled` in the Service Configuration.
>

Thanks.

I suggest to include 'topic' in the flag,  we have too many entities in
Pulsar


Enrico

>


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-22 Thread Yan Zhao
Yes, wo define `twoPhaseDeletionEnabled` in the Service Configuration.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-20 Thread Enrico Olivelli
Il Mer 20 Lug 2022, 18:05 Yan Zhao  ha scritto:

> Hi, Enrico. If we bind the the topic with per-tenant, when tenant be
> deleted or the tenant not be load anymore, we data in the tennat system
> topic can't be consumed before the tenant next load.
>

This is a good point.

So let's go with the system topic.
Thanks

Are we adding a configuratio flag?

Enrico



> On 2022/07/14 15:35:16 Enrico Olivelli wrote:
> > This is very interesting.
> >
> > I have only one concern.
> > I think that we should at least use a per-tenant system topic, or,
> > better, per-namespace.
> > There is no need to create the deletion topic if there is nothing to
> delete.
> >
> > I am used to dealing with Pulsar clusters in which Tenants are
> > strictly isolated.
> > Introducing another component that is not tenant aware it kind of a
> > problem (we already have such problem with the Transaction
> > Coordinator)
> >
> > Enrico
>


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-20 Thread Yan Zhao
Hi, Enrico. If we bind the the topic with per-tenant, when tenant be deleted or 
the tenant not be load anymore, we data in the tennat system topic can't be 
consumed before the tenant next load.

On 2022/07/14 15:35:16 Enrico Olivelli wrote:
> This is very interesting.
> 
> I have only one concern.
> I think that we should at least use a per-tenant system topic, or,
> better, per-namespace.
> There is no need to create the deletion topic if there is nothing to delete.
> 
> I am used to dealing with Pulsar clusters in which Tenants are
> strictly isolated.
> Introducing another component that is not tenant aware it kind of a
> problem (we already have such problem with the Transaction
> Coordinator)
> 
> Enrico


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-15 Thread Yan Zhao
If we use per-tenant system topic or per-namespace. and there is no active 
topic in the tenant  anymore, how we consume the msg in this tenant. 
This is the concern I cared about, so I prefer to use system topic.


Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-14 Thread Enrico Olivelli
This is very interesting.

I have only one concern.
I think that we should at least use a per-tenant system topic, or,
better, per-namespace.
There is no need to create the deletion topic if there is nothing to delete.

I am used to dealing with Pulsar clusters in which Tenants are
strictly isolated.
Introducing another component that is not tenant aware it kind of a
problem (we already have such problem with the Transaction
Coordinator)

Enrico

Il giorno mer 13 lug 2022 alle ore 10:54 horizonzy
 ha scritto:
>
> Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion
> protocol based on system topic" Proposal Link:
> https://github.com/apache/pulsar/issues/16569
>
> ---
>
> ## Motivation
> Original issue: #13238
> In current ledger deletion, we divided it into two separate steps. It
> happens in ManagedLedger and ManagedCursor.
> Remove all the waiting to delete ledgers from the ledger list and update
> the newest ledger list into a meta store.
> In the meta store update callback operation, delete the waiting to delete
> ledgers from storage systems, such as BookKeeper or Tiered storage.
>
> Due to the separate step, we can’t ensure the ledger deletion transaction.
> If the first step succeeds and the second step fails, it will lead to
> ledgers that can't be deleted from the storage system forever. The second
> step may fail by broker restart or storage system deletion failed.
>
> In our customer’s environment, we have found many orphan ledgers cause by
> the above reason.
> ## Design
> Based on the above, we Introduce LedgerDeletionService to support two phase
> deletion. We hope it provides a general solution for two phase deletion. It
> will cover the problem we already found in managed-ledger, managed-cursor
> and schema-storage.
>
> In this design, we use the system topic to help us to store the pending
> delete ledger.
> * pulsar/system/persistent/__ledger_deletion : store the pending delete
> ledger
> * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above
>
>
> ### The first phase:
> ```
> client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
>  .topic("pulsar/system/persistent/__ledger_deletion")
>  .enableBatching(false)
>  .createAsync();
> ```
> In the LedgerDeletionService  start, it will create  a producer to send
> pending delete ledger.
> When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
> delay is for consumer side, if send it immediately, maybe the metadata
> din't change when consumer receive it). After the send operation succeeds,
>  then to operate metadata. If send msg failed, we think this deletion
> operation failed, and didn't operate metadata.
>
> **PendingDeleteLedgerInfo**
> ```
> public class PendingDeleteLedgerInfo {
> /**
>  * Partitioned topic name without domain. Likes
> public/default/test-topic-partition-1 or
>  * public/default/test-topic
>  */
> private String topicName;
>
> /**
>  * The ledger component . managed-ledger, managed-cursor and
> schema-storage.
>  */
> private LedgerComponent ledgerComponent;
>
> /**
>  * The ledger type. ledger or offload-ledger.
>  */
> private LedgerType ledgerType;
>
> /**
>  * LedgerId.
>  */
> private Long ledgerId;
>
> /**
>  * Context, holds offload info. If bk ledger, the context is null.
>  */
> private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
>
> /**
>  * When a consumer receives a pending delete ledger, maybe the ledger
> is still in use, we need to check the ledger is in use.
>  * In some cases, we needn't check the ledger still in use.
>  */
> private boolean checkLedgerStillInUse;
>
> /**
>  * Extent properties.
>  */
> private Map properties = new HashMap<>();
> }
> ```
> ### The second phase
> ```
> client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
> .topic("pulsar/system/persistent/__ledger_deletion")
> .subscriptionName("ledger-deletion-worker")
> .subscriptionType(SubscriptionType.Shared)
> .deadLetterPolicy(DeadLetterPolicy.builder()
>
>  
> .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
>  .maxRedeliverCount(10).build())
>  .subscribeAsync()
> ```
> In the LedgerDeletionService start, it will start a consumer to consume
> pending delete ledger.
>
> ### Check if the ledger is still in use
> When received a pending delete ledger, we should check if the pending
> delete ledger still exists in the metadata. If exists, we should give up to
> delete this ledger in this time, the consumer won't ack this message,  do
> reconsumeLater 10 min. If it does not exist, it will try to delete legder
> or offload-ledger according to the ledger-type. If delete succeeds, ack
> this message, if delete failed, reconsumerLater 10 min. If a
> PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ
> pulsar/system/persistent/__ledger_deletion_archive
>
> 

[DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic

2022-07-13 Thread horizonzy
Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion
protocol based on system topic" Proposal Link:
https://github.com/apache/pulsar/issues/16569

---

## Motivation
Original issue: #13238
In current ledger deletion, we divided it into two separate steps. It
happens in ManagedLedger and ManagedCursor.
Remove all the waiting to delete ledgers from the ledger list and update
the newest ledger list into a meta store.
In the meta store update callback operation, delete the waiting to delete
ledgers from storage systems, such as BookKeeper or Tiered storage.

Due to the separate step, we can’t ensure the ledger deletion transaction.
If the first step succeeds and the second step fails, it will lead to
ledgers that can't be deleted from the storage system forever. The second
step may fail by broker restart or storage system deletion failed.

In our customer’s environment, we have found many orphan ledgers cause by
the above reason.
## Design
Based on the above, we Introduce LedgerDeletionService to support two phase
deletion. We hope it provides a general solution for two phase deletion. It
will cover the problem we already found in managed-ledger, managed-cursor
and schema-storage.

In this design, we use the system topic to help us to store the pending
delete ledger.
* pulsar/system/persistent/__ledger_deletion : store the pending delete
ledger
* pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above


### The first phase:
```
client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
 .topic("pulsar/system/persistent/__ledger_deletion")
 .enableBatching(false)
 .createAsync();
```
In the LedgerDeletionService  start, it will create  a producer to send
pending delete ledger.
When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
delay is for consumer side, if send it immediately, maybe the metadata
din't change when consumer receive it). After the send operation succeeds,
 then to operate metadata. If send msg failed, we think this deletion
operation failed, and didn't operate metadata.

**PendingDeleteLedgerInfo**
```
public class PendingDeleteLedgerInfo {
/**
 * Partitioned topic name without domain. Likes
public/default/test-topic-partition-1 or
 * public/default/test-topic
 */
private String topicName;

/**
 * The ledger component . managed-ledger, managed-cursor and
schema-storage.
 */
private LedgerComponent ledgerComponent;

/**
 * The ledger type. ledger or offload-ledger.
 */
private LedgerType ledgerType;

/**
 * LedgerId.
 */
private Long ledgerId;

/**
 * Context, holds offload info. If bk ledger, the context is null.
 */
private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;

/**
 * When a consumer receives a pending delete ledger, maybe the ledger
is still in use, we need to check the ledger is in use.
 * In some cases, we needn't check the ledger still in use.
 */
private boolean checkLedgerStillInUse;

/**
 * Extent properties.
 */
private Map properties = new HashMap<>();
}
```
### The second phase
```
client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
.topic("pulsar/system/persistent/__ledger_deletion")
.subscriptionName("ledger-deletion-worker")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()

 
.deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
 .maxRedeliverCount(10).build())
 .subscribeAsync()
```
In the LedgerDeletionService start, it will start a consumer to consume
pending delete ledger.

### Check if the ledger is still in use
When received a pending delete ledger, we should check if the pending
delete ledger still exists in the metadata. If exists, we should give up to
delete this ledger in this time, the consumer won't ack this message,  do
reconsumeLater 10 min. If it does not exist, it will try to delete legder
or offload-ledger according to the ledger-type. If delete succeeds, ack
this message, if delete failed, reconsumerLater 10 min. If a
PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ
pulsar/system/persistent/__ledger_deletion_archive

_Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If
the storage system shuts down, the pending delete ledger will try to delete
10 times in 100 min. So we don't worry if the storage system shutdown, the
ledger can't be delete._

### How to get existing ledger when consumer receives pending delete ledger
Now we supply admin api topics getInternalStats {topic-name}, the response
contains three part info we want.
* managed-ledger ledgerIds
* managed-cursor ledgerIds
* schema-storage ledgerIds

So when receiving a pending delete ledger,  we will fetch topic internal
stats.  To avoid fetching topic internal stats too frequently, we define
ledgersCache  for them.
```
private final LoadingCache> ledgersCache =