RE: Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> Penghui suggested instead of communicating via Admin API we use the same > methodology used in the new load balancer - using a compacted topic as a > queue: > The system topic you create will have compaction enabled. > You write the pending ledger delete, and the key is ledger-ID and > ledger-type (BK, offloaded). > All brokers will consume this compacted topic as table-view. Basically upon > restart, you read all messages into memory. > When you reading you only keep the messages related to topic the broker > owns, others you discard (table view have filter feature). > For each message in table-view, you delete the ledgers, then write a > message to the topic with same key, null message, meaning it will be > deleted. > > This way you don't need to invent a new private Admin API. > > WDYT? I got it. it’s a good solution. I will tune the pip using this way.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Sun, Mar 12, 2023 at 7:21 PM Yan Zhao wrote: > > > In the LedgerDeletionService start, it will create a producer for > sending > > > pending delete ledger. When deleting a ledger, the producer sends > > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, > delete > > > the ledger id from the whole ledgers and update the ledgers into the > meta > > > store. If the sent fails, do nothing. > > > > You don't delete the ledger ID from the whole ledgers, then update the > > ledgers in the metadata store. We described it above already; we remove > the > > pending delete ledger ID from the list of ledger IDs, publish the > message, > > and only at the end update the metadata store. > Yes, correct it. > > > > In the LedgerDeletionService start, it will create a producer for > sending > > > pending delete ledger. When deleting a ledger, the producer sends > > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, > delete > > > the ledger id from the whole ledgers and update the ledgers into the > meta > > > store. If the sent fails, do nothing. > > > After that section, you write the methods of DeletionService and the > > pseudo-code, but it's hard to understand what goes where: What exactly > > deletion service does, and where the pseudo code will actually go into. > It's just a summary, more detailed information on the later sections. > > > Also, let's concentrate on the implementation details in the > Implementation > > section High-Level Design. Let the readers understand the concept end to > > end in the High-Level Design, and then in the Implementation, start > diving > > into the nitty gritty details. > > > > > The consumer uses pulsar admin to send PendingDeleteLedgerInfo by the > > > broker restful API. > > > Please explain here that you will introduce a new Admin API for that. > Also, > > explain that you will issue that API against the broker who owns the > topic > > (The schema registry has a ledger per topic?) > correct it. > > > I still don't understand which methods or behavior you're changing > exactly > > for the managed ledger, cursor, and schema. > > Why am I asking? > > - Ledger deletion can happen because: > > * Background truncation since ledgers are allowed to be deleted based on > > expiration policy. > > * Truncate request by an admin API. > > * More? > All the ways you mention do the same work: trigger > ManagedLedgerImpl#internalTrimLedgers. > We change the implementation approach in the internalTrimLedgers. > > > If you find the ledger ID is still in the meta-store, you return a > failure, > > and the consumer acks the message, right? > > If it's background deletion, all good since it will kick in again in a > few > > seconds and retry. > > But what happens if a user runs a REST admin API command to truncate the > > topic? > Using the REST admin API command to truncate the topic is not different > from the background task. It's happening in the first metadata deletion > phase, we didn't check whether the ledger is in the metadata or not, the > first phase only picks all consumed ledgers, then try to delete them. > > > > check if the ledger is still in use, > > > You mean, check if the ledger ID still exists in the ledger ID list of > the > > topic in the metadata store, right? > > > > then check the PendingDeleteLedgerInfo ledgerType. > > > > Check for what? > see 3.1, 3.2. > > > > > 2.2 If the topic not exists, the consumer checks the > > > PendingDeleteLedgerInfo ledgerType. > > > 3.1 If the ledgerType is Ledger, delete the data at the bookkeeper > side. > > > 3.2 If the ledgerType is Offload-Ledger, delete the data at the > > > tiered-storage side. > > > Something doesn't look good in the numbering and how you structure it. > 3.1 > > is probably an indent inside 2.2, no? > Sorry, it may confuse the user. I add the flow hint in the pip. > > > > 1. What happens if the topic doesn't exist when you are in the broker in > > the delete pending ledger API implementation? > The broker will respond 404 Topic not found an error to the consumer, the > consumer knows the topic already does not exist, it will delete the ledger > locally. > > You need to write that in your PIP. In your pseudo code you make it appear as if the failure is boolean, but in fact the response should be an error code. > > 2. Say the ledger ID is still in the metadata store. If the background > > truncation of the managed ledge was the trigger, there is no real point > in > > retrying, right? Once you start over, another message will be sent to the > > system topic. > In the storage deletion phase, If the ledger id is still in the metadata > store, we think it will be used, we didn't delete it and ack the message. > Due to the ledger is still in the metadata store, it will be deleted at > the next background truncation task. So the ledger will be deleted finally. > > > > 3. Can you explain exactly the cases in which it is actually worth doing > > the retry? > > I mean the obvious:
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> > In the LedgerDeletionService start, it will create a producer for sending > > pending delete ledger. When deleting a ledger, the producer sends > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, delete > > the ledger id from the whole ledgers and update the ledgers into the meta > > store. If the sent fails, do nothing. > > You don't delete the ledger ID from the whole ledgers, then update the > ledgers in the metadata store. We described it above already; we remove the > pending delete ledger ID from the list of ledger IDs, publish the message, > and only at the end update the metadata store. Yes, correct it. > > In the LedgerDeletionService start, it will create a producer for sending > > pending delete ledger. When deleting a ledger, the producer sends > > PendingDeleteLedgerInfo to the system-topic. If the sent succeeds, delete > > the ledger id from the whole ledgers and update the ledgers into the meta > > store. If the sent fails, do nothing. > After that section, you write the methods of DeletionService and the > pseudo-code, but it's hard to understand what goes where: What exactly > deletion service does, and where the pseudo code will actually go into. It's just a summary, more detailed information on the later sections. > Also, let's concentrate on the implementation details in the Implementation > section High-Level Design. Let the readers understand the concept end to > end in the High-Level Design, and then in the Implementation, start diving > into the nitty gritty details. > > The consumer uses pulsar admin to send PendingDeleteLedgerInfo by the > > broker restful API. > Please explain here that you will introduce a new Admin API for that. Also, > explain that you will issue that API against the broker who owns the topic > (The schema registry has a ledger per topic?) correct it. > I still don't understand which methods or behavior you're changing exactly > for the managed ledger, cursor, and schema. > Why am I asking? > - Ledger deletion can happen because: > * Background truncation since ledgers are allowed to be deleted based on > expiration policy. > * Truncate request by an admin API. > * More? All the ways you mention do the same work: trigger ManagedLedgerImpl#internalTrimLedgers. We change the implementation approach in the internalTrimLedgers. > If you find the ledger ID is still in the meta-store, you return a failure, > and the consumer acks the message, right? > If it's background deletion, all good since it will kick in again in a few > seconds and retry. > But what happens if a user runs a REST admin API command to truncate the > topic? Using the REST admin API command to truncate the topic is not different from the background task. It's happening in the first metadata deletion phase, we didn't check whether the ledger is in the metadata or not, the first phase only picks all consumed ledgers, then try to delete them. > > check if the ledger is still in use, > You mean, check if the ledger ID still exists in the ledger ID list of the > topic in the metadata store, right? > > then check the PendingDeleteLedgerInfo ledgerType. > > Check for what? see 3.1, 3.2. > > 2.2 If the topic not exists, the consumer checks the > > PendingDeleteLedgerInfo ledgerType. > > 3.1 If the ledgerType is Ledger, delete the data at the bookkeeper side. > > 3.2 If the ledgerType is Offload-Ledger, delete the data at the > > tiered-storage side. > Something doesn't look good in the numbering and how you structure it. 3.1 > is probably an indent inside 2.2, no? Sorry, it may confuse the user. I add the flow hint in the pip. > 1. What happens if the topic doesn't exist when you are in the broker in > the delete pending ledger API implementation? The broker will respond 404 Topic not found an error to the consumer, the consumer knows the topic already does not exist, it will delete the ledger locally. > 2. Say the ledger ID is still in the metadata store. If the background > truncation of the managed ledge was the trigger, there is no real point in > retrying, right? Once you start over, another message will be sent to the > system topic. In the storage deletion phase, If the ledger id is still in the metadata store, we think it will be used, we didn't delete it and ack the message. Due to the ledger is still in the metadata store, it will be deleted at the next background truncation task. So the ledger will be deleted finally. > 3. Can you explain exactly the cases in which it is actually worth doing > the retry? > I mean the obvious: you failed to delete it from BK due to a transient > error. But the previous one of checking the list? worth the retry? If the ledger is still in the meta store at the second storage deletion, we won't retry. we will ack the message to avoid useless retry. We retry deletion only on the storage delete failed case. > then respond to the error msg "ledger still in use" to the consumer. > You need to have those
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Hi, Asaf, Tune the pip https://github.com/apache/pulsar/issues/16569, please help to review it again, thanks!
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Hi, Asaf. Tune the pip-186(https://github.com/apache/pulsar/issues/16569), please help to view it again. thanks! On 2023/02/16 13:50:54 Yan Zhao wrote: > > If understood correctly, every broker will have a consumer right? You will > > use a fail-over subscription? The retry-topic is consumed by the same > > subscription, same consumer? > Yes, you are right, there is the case you mention. The deletion is > idempotent, I'm not sure if it's worth making it sync for it. > > > In this very long mailing list thread, we have mentioned many fixes to be > > done. Can you ping in the mailing list once you have managed to fix it all? > > That's fine, I will push the new pip in two days. After the new pip pushing, > I will ping you. >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> If understood correctly, every broker will have a consumer right? You will > use a fail-over subscription? The retry-topic is consumed by the same > subscription, same consumer? Yes, you are right, there is the case you mention. The deletion is idempotent, I'm not sure if it's worth making it sync for it. > In this very long mailing list thread, we have mentioned many fixes to be > done. Can you ping in the mailing list once you have managed to fix it all? That's fine, I will push the new pip in two days. After the new pip pushing, I will ping you.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Tue, Feb 14, 2023 at 1:02 PM Yan Zhao wrote: > > Shouldn't you specify the changes you are going to make in > > `PulsarApi.proto`? > We didn't change the proto file, we use the Restful API(AdminAPI). > > Also, shouldn't you change the wording in this sentence in the PIP to > > clarify you will use AdminAPI / Pulsar RPC? > We use AdminAPI, I will tune the pip. > > > > > > You need to specify in the PIP that you will publish one message for > > deletion from BK, and one message for deletion of from offloader and > > explain why. Also provide the context you just gave me about where you > find > > if it is actually offloaded or not. > Ok. > > > Let me see if I understand correctly. Now the PIP says: > > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > > > and you say it should be: > > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo.OffloadContext > > offloadContext; > > > > ? > Yes. > > > > > > In that case I'm not sure I understand the need for > > > > /** > > * Extent properties. > > */ > > private Map properties = new HashMap<>(); > > > > It can be removed. > > > Not just the title, your PIP needs to explain everything we wrote above > > about trim. > > I know, and I will tune the PIP contents. > > > Ok, say you only delete from ZK in the first step. Still what I wrote > above > > still applies: You might have 2 messages trying to delete same ledger at > > the same time. Shouldn't this have a lock to protect against it? > > We design that the consumer only handles one message at the same time, > which can ensure sync. > If understood correctly, every broker will have a consumer right? You will use a fail-over subscription? The retry-topic is consumed by the same subscription, same consumer? In this very long mailing list thread, we have mentioned many fixes to be done. Can you ping in the mailing list once you have managed to fix it all?
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> Shouldn't you specify the changes you are going to make in > `PulsarApi.proto`? We didn't change the proto file, we use the Restful API(AdminAPI). > Also, shouldn't you change the wording in this sentence in the PIP to > clarify you will use AdminAPI / Pulsar RPC? We use AdminAPI, I will tune the pip. > > You need to specify in the PIP that you will publish one message for > deletion from BK, and one message for deletion of from offloader and > explain why. Also provide the context you just gave me about where you find > if it is actually offloaded or not. Ok. > Let me see if I understand correctly. Now the PIP says: > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > and you say it should be: > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo.OffloadContext > offloadContext; > > ? Yes. > > In that case I'm not sure I understand the need for > > /** > * Extent properties. > */ > private Map properties = new HashMap<>(); > It can be removed. > Not just the title, your PIP needs to explain everything we wrote above > about trim. I know, and I will tune the PIP contents. > Ok, say you only delete from ZK in the first step. Still what I wrote above > still applies: You might have 2 messages trying to delete same ledger at > the same time. Shouldn't this have a lock to protect against it? We design that the consumer only handles one message at the same time, which can ensure sync.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Mon, Feb 6, 2023 at 12:02 PM Yan Zhao wrote: > > > On 2023/02/05 14:40:10 Asaf Mesika wrote: > > On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao wrote: > > > > > > If you persisted the message successfully by the producer and the > broker > > > > was terminated before being able to delete the ledger from the > metadata > > > > service? > > > If the broker is terminated, the consumer won't ack the message, the > > > message will be re-consume later. > > > > > > Let me quote the text I was replying to: > > > > > When pulsar wants to delete a ledger, ManagedLedger uses > > > ledgerDeletionService to send a message, the message content contains > the > > > waiting delete ledger info. After sending success, delete the ledger id > > > from the metadata store. > > > > > > If I understand correctly, the following steps will happen: > > 1. Producer writes a message to system topic, containing the ledger > > information to delete. > > 2. Upon success of (1), the ledger ID is deleted from the metadata store. > > > > You reply was about the consumer, my question was about the producer, so > > I'm writing my reply here again (full reply): > > > > If you persisted the message successfully by the producer and the broker > > was terminated before being able to delete the ledger (ID) from the > > metadata service? > > After the broker restart, the broker still thinks the ledger is available, > and the broker can still read data from this ledger. > The consumer received the message, it triggers the corresponding broker to > delete the ledger. > The ledger still exists in the metadata, ignore deletion, and the consumer > ack the message. > > At the next trimConsumedLedgersInBackground, the ledger will be deleted > again. > > > > > > > > > > I recommend having the logic to delete the ledger be done in the > message > > > > consumer side: > > > > - if the ledger exists in the MD store, delete it. > > > > - send delete command to BK > > > > Both as I understand are idempotent. If for some reason one action > was > > > done > > > > but the other wasn't - ZK down, BK down, broker hosting the consumer > > > > terminated - either the message will not be acknowledged, or you > > > negatively > > > > acknowledge. > > > We send a delete command to the broker, it will connect to the > > > corresponding broker which loads the topic. The corresponding broker > > > received the command, then passes the command to ManagedLedger, the > > > ManagedLedger does the actual delete operation. > > > If the consumer does the delete operation, it's a little unreasonable. > The > > > ledger manager should be `ManagedLedger`, let it do the delete will be > > > better. > > > > > > > You say the consumer will send a delete command to the broker: By what > > means? Sending a message on a topic, or running an RPC request by adding > a > > new command to the PulsarApi.proto file? > > If the latter - shouldn't this be documented? > Yes, by sending an RPC request. > I just add sections: > `Introduce admin api in Topics`. > `Introduce admin api in Schemas.` > > I will describe it detailed. > Shouldn't you specify the changes you are going to make in `PulsarApi.proto`? Also, shouldn't you change the wording in this sentence in the PIP to clarify you will use AdminAPI / Pulsar RPC? When receiving a pending delete ledger msg, we will check if the topic > still exists. If the topic exists, send a delete command > (PendingDelteLedger) to the broker which owns the topic. > > > > > Let me assume the consumer which receives the message, will run an Pulsar > > API command to instruct the broker assigned to this topic to delete the > > ledger - Why the implementation of this command in the assigned broker > > can't have both the deletion of the ledger ID from the ZK, AND deletion > > from BK? Why do we need to have the ZK deletion done on the producer > side, > > after we produced the message? > > In pulsar, as long as we delete the ledger id from metadata store, we > think the ledger has already been deleted. The Pulsar didn't refer to the > ledger anymore, although the ledger still exists in the BK. The ledger > deletion in BK can be done at any time, as long as we ensure the ledger > deletion in BK will do finally. > > If we delete both ledger ID from zk and ledger from bk when the broker > received the delete RPC command. > There may be a problem in this case. > The broker wants to delete ledger 1, it send the message to the system > topic and didn't remove the ledger id 1 from the zk. So the ManagedLedger > still thinks ledger 1 is readable, it may read ledger 1 from BK. But at the > same time, the consumer received msg, it send the delete RPC command to the > broker, the broker delete ledger id from zk and delete ledger from BK. The > ManagedLedger is reading from the BK, it will throw exception. > > So we remove the ledger id from zk at the producer stage, the broker > thinks the ledger has already deleted, and didn't do any operation for the > ledger. > >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On 2023/02/07 18:41:20 Heesung Sohn wrote: > On Sun, Feb 5, 2023 at 2:26 AM Yan Zhao wrote: > > > On 2023/02/03 18:14:53 Heesung Sohn wrote: > > > There are some cases to trigger it. > > > 1. A cursor be removed. > > > 2. Close the current ledger and create a new ledger. > > > 3. Consumer ack the message, the slowest cursor move forward. > > > 4. User trigger truncateTopic by admin. > > > > > > I see that this pip is for the internal ledger deletion cases(1-3 above), > > > and it appears that such internal deletion operations do not require > > > pre-validation or status checks(and no additional iops on the metadata > > > store). I agree that we need a separate pip for async admin operations. > > > > This pip is also applied to case 4. All cases will invoke > > `trimConsumedLedgersInBackground`. > > This pip acts inside the method. > > > > For the long-running async admin operations such as > deleteTopic/Namespace/Tenant/*, as I mentioned, I think we better provide > the describe* APIs to enable the admins to check the status of the async > operation. > I believe we can first use this system topic from this pip to support case > 4, "admin async operations." > Still, we probably need a separate pip to discuss > - the expected behavior/experience of async admin operations. > - if we want to provide describe* APIs for the async operations. > - target resources > - architecture/components how to support describe* APIs Agree, if there are some long-running async, and don't know the expected end-time operations. We can add the describe* API to get the progress rate of the operations.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Sun, Feb 5, 2023 at 2:26 AM Yan Zhao wrote: > On 2023/02/03 18:14:53 Heesung Sohn wrote: > > There are some cases to trigger it. > > 1. A cursor be removed. > > 2. Close the current ledger and create a new ledger. > > 3. Consumer ack the message, the slowest cursor move forward. > > 4. User trigger truncateTopic by admin. > > > > I see that this pip is for the internal ledger deletion cases(1-3 above), > > and it appears that such internal deletion operations do not require > > pre-validation or status checks(and no additional iops on the metadata > > store). I agree that we need a separate pip for async admin operations. > > This pip is also applied to case 4. All cases will invoke > `trimConsumedLedgersInBackground`. > This pip acts inside the method. > For the long-running async admin operations such as deleteTopic/Namespace/Tenant/*, as I mentioned, I think we better provide the describe* APIs to enable the admins to check the status of the async operation. I believe we can first use this system topic from this pip to support case 4, "admin async operations." Still, we probably need a separate pip to discuss - the expected behavior/experience of async admin operations. - if we want to provide describe* APIs for the async operations. - target resources - architecture/components how to support describe* APIs
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On 2023/02/05 14:40:10 Asaf Mesika wrote: > On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao wrote: > > > > If you persisted the message successfully by the producer and the broker > > > was terminated before being able to delete the ledger from the metadata > > > service? > > If the broker is terminated, the consumer won't ack the message, the > > message will be re-consume later. > > > Let me quote the text I was replying to: > > > When pulsar wants to delete a ledger, ManagedLedger uses > > ledgerDeletionService to send a message, the message content contains the > > waiting delete ledger info. After sending success, delete the ledger id > > from the metadata store. > > > If I understand correctly, the following steps will happen: > 1. Producer writes a message to system topic, containing the ledger > information to delete. > 2. Upon success of (1), the ledger ID is deleted from the metadata store. > > You reply was about the consumer, my question was about the producer, so > I'm writing my reply here again (full reply): > > If you persisted the message successfully by the producer and the broker > was terminated before being able to delete the ledger (ID) from the > metadata service? After the broker restart, the broker still thinks the ledger is available, and the broker can still read data from this ledger. The consumer received the message, it triggers the corresponding broker to delete the ledger. The ledger still exists in the metadata, ignore deletion, and the consumer ack the message. At the next trimConsumedLedgersInBackground, the ledger will be deleted again. > > > > > > I recommend having the logic to delete the ledger be done in the message > > > consumer side: > > > - if the ledger exists in the MD store, delete it. > > > - send delete command to BK > > > Both as I understand are idempotent. If for some reason one action was > > done > > > but the other wasn't - ZK down, BK down, broker hosting the consumer > > > terminated - either the message will not be acknowledged, or you > > negatively > > > acknowledge. > > We send a delete command to the broker, it will connect to the > > corresponding broker which loads the topic. The corresponding broker > > received the command, then passes the command to ManagedLedger, the > > ManagedLedger does the actual delete operation. > > If the consumer does the delete operation, it's a little unreasonable. The > > ledger manager should be `ManagedLedger`, let it do the delete will be > > better. > > > > You say the consumer will send a delete command to the broker: By what > means? Sending a message on a topic, or running an RPC request by adding a > new command to the PulsarApi.proto file? > If the latter - shouldn't this be documented? Yes, by sending an RPC request. I just add sections: `Introduce admin api in Topics`. `Introduce admin api in Schemas.` I will describe it detailed. > > Let me assume the consumer which receives the message, will run an Pulsar > API command to instruct the broker assigned to this topic to delete the > ledger - Why the implementation of this command in the assigned broker > can't have both the deletion of the ledger ID from the ZK, AND deletion > from BK? Why do we need to have the ZK deletion done on the producer side, > after we produced the message? In pulsar, as long as we delete the ledger id from metadata store, we think the ledger has already been deleted. The Pulsar didn't refer to the ledger anymore, although the ledger still exists in the BK. The ledger deletion in BK can be done at any time, as long as we ensure the ledger deletion in BK will do finally. If we delete both ledger ID from zk and ledger from bk when the broker received the delete RPC command. There may be a problem in this case. The broker wants to delete ledger 1, it send the message to the system topic and didn't remove the ledger id 1 from the zk. So the ManagedLedger still thinks ledger 1 is readable, it may read ledger 1 from BK. But at the same time, the consumer received msg, it send the delete RPC command to the broker, the broker delete ledger id from zk and delete ledger from BK. The ManagedLedger is reading from the BK, it will throw exception. So we remove the ledger id from zk at the producer stage, the broker thinks the ledger has already deleted, and didn't do any operation for the ledger. > > > > > General question: When a ledger is persisted to ZK, where is the ledger > > > metadata persisted in ZK (more specifically it's metadata, which includes > > > the associate? > > > Is it also used when building out the key (path) in ZK? > > > > > > https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74 > > > > It's the content in the zk node. when creating a ledger by bookkeeper, it > > will create a path like `/ledgers/00//L`, the path value is an > > instance of
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao wrote: > > If you persisted the message successfully by the producer and the broker > > was terminated before being able to delete the ledger from the metadata > > service? > If the broker is terminated, the consumer won't ack the message, the > message will be re-consume later. Let me quote the text I was replying to: > When pulsar wants to delete a ledger, ManagedLedger uses > ledgerDeletionService to send a message, the message content contains the > waiting delete ledger info. After sending success, delete the ledger id > from the metadata store. If I understand correctly, the following steps will happen: 1. Producer writes a message to system topic, containing the ledger information to delete. 2. Upon success of (1), the ledger ID is deleted from the metadata store. You reply was about the consumer, my question was about the producer, so I'm writing my reply here again (full reply): If you persisted the message successfully by the producer and the broker was terminated before being able to delete the ledger (ID) from the metadata service? > > > I recommend having the logic to delete the ledger be done in the message > > consumer side: > > - if the ledger exists in the MD store, delete it. > > - send delete command to BK > > Both as I understand are idempotent. If for some reason one action was > done > > but the other wasn't - ZK down, BK down, broker hosting the consumer > > terminated - either the message will not be acknowledged, or you > negatively > > acknowledge. > We send a delete command to the broker, it will connect to the > corresponding broker which loads the topic. The corresponding broker > received the command, then passes the command to ManagedLedger, the > ManagedLedger does the actual delete operation. > If the consumer does the delete operation, it's a little unreasonable. The > ledger manager should be `ManagedLedger`, let it do the delete will be > better. > You say the consumer will send a delete command to the broker: By what means? Sending a message on a topic, or running an RPC request by adding a new command to the PulsarApi.proto file? If the latter - shouldn't this be documented? Let me assume the consumer which receives the message, will run an Pulsar API command to instruct the broker assigned to this topic to delete the ledger - Why the implementation of this command in the assigned broker can't have both the deletion of the ledger ID from the ZK, AND deletion from BK? Why do we need to have the ZK deletion done on the producer side, after we produced the message? > > General question: When a ledger is persisted to ZK, where is the ledger > > metadata persisted in ZK (more specifically it's metadata, which includes > > the component? > > Is it also used when building out the key (path) in ZK? > > > https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74 > > It's the content in the zk node. when creating a ledger by bookkeeper, it > will create a path like `/ledgers/00//L`, the path value is an > instance of LedgerMetadataImpl. > The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's > metadata. > > If the ledger is for Ledger, the customMetadata store: > `key:application, value:pulsar` > `key:component, value:managed-ledger` > `key:pulsar/managed-ledger, value: ledgerName` > > If the ledger is for Cursor, the customMetadata store: > `key:application, value:pulsar` > `key:component, value:managed-ledger` > `key:pulsar/managed-ledger, value: ledgerName` > `key:pulsar/cursor, value: curSorName` > > If the ledger is for schema, the customMetadata store: > `key:application, value:pulsar` > `key:component, value:schema` > `key:pulsar/schemaId, value: schemaId` > > So when we get the ledger metadata from bookkeeper, we can get the ledger > source. > Thanks for the info! > > > Isn't the type saved together with the ledger in ZK? > We need to differ it, the same ledger may store both on the bk side and > the offload side. > If a ledger want to delete the bk data and the offload data, it should > publishes two message to the system topic. The broker needs it to determine > whether to delete offload or bk. So you're saying that the information whether this ledger is either only in BK, or only in offload storage or both is NOT encoded in the Ledger custom metadata properties of the ledger? The offloader saves that information for that ledger in another place in ZK? If this ledger is a Cursor ledger, how do you know where to delete it inside ZK if you don't include the cursor name in the PendingDeleteLedgerInfo message? > > > It's for the offloaded ledger, when we want to delete the offload ledger, > > > we need offloadedContextUuid, here we can simplify it to > offloadContextUuid. > > > Sounds much better. Maybe offloadedLedgerUUID? (why context?) > Agree. > > > > > Are you
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On 2023/02/03 20:05:59 Heesung Sohn wrote: > For these internal requesters, > 1. A cursor be removed. > 2. Close the current ledger and create a new ledger. > 3. Consumer ack the message, the slowest cursor move forward. > > How do we prevent these callers from requesting duplicate requests for the > same resources(ledgers) (how do we handle racing conditions between the > requesters and consumers? ) These 3 case all invoke `trimConsumedLedgersInBackground`. ``` executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise))); ``` The managed ledger only load by one broker. And the executor is thread-safe. > It seems like there could be duplicate requests. Are we relying on these > methods to dedup messages? > 1. the idempotency of the deletion operations > 2. enough delay in the requesters' scan cycles > 3. enable dedup in the system topic There is a case that will cause duplicate requests. The ManagedLedger wants to delete ledger 1, after sending it to the system topic, the broker shut down and didn't persist the left ledgers to the metadata store. In the next trigger, ledger 1 will be sent to the system topic again. Here we use `the idempotency of the deletion operations` to handle it.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On 2023/02/03 18:14:53 Heesung Sohn wrote: > There are some cases to trigger it. > 1. A cursor be removed. > 2. Close the current ledger and create a new ledger. > 3. Consumer ack the message, the slowest cursor move forward. > 4. User trigger truncateTopic by admin. > > I see that this pip is for the internal ledger deletion cases(1-3 above), > and it appears that such internal deletion operations do not require > pre-validation or status checks(and no additional iops on the metadata > store). I agree that we need a separate pip for async admin operations. This pip is also applied to case 4. All cases will invoke `trimConsumedLedgersInBackground`. This pip acts inside the method.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
For these internal requesters, 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. How do we prevent these callers from requesting duplicate requests for the same resources(ledgers) (how do we handle racing conditions between the requesters and consumers? ) It seems like there could be duplicate requests. Are we relying on these methods to dedup messages? 1. the idempotency of the deletion operations 2. enough delay in the requesters' scan cycles 3. enable dedup in the system topic On Fri, Feb 3, 2023 at 10:14 AM Heesung Sohn wrote: > > > There are some cases to trigger it. > 1. A cursor be removed. > 2. Close the current ledger and create a new ledger. > 3. Consumer ack the message, the slowest cursor move forward. > 4. User trigger truncateTopic by admin. > > I see that this pip is for the internal ledger deletion cases(1-3 above), > and it appears that such internal deletion operations do not require > pre-validation or status checks(and no additional iops on the metadata > store). I agree that we need a separate pip for async admin operations. > > > On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao wrote: > >> > If we don't want to add more pressure on the metadata store from this >> > feature as a requirement, >> > I think we can use a system topic as well. Just to confirm, is this the >> > direction we are agreeing on? >> The design is too generic, I'm afraid not it is not suitable for ledger >> deletion. >> >> > >> > Using a system topic, >> > 1. A broker receives an async operation request on a target resource >> from a >> > client. >> > 2. If the target resource is available, the broker publishes an >> operation >> > request message to the system topic. >> >2.1 If there is no target resource, >> > error out(ResourceDoesNotExistException). >> It's more generic, I'm afraid it not suitable for ledger deletion. >> >> In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will >> trigger the delete operation. It will find the slowest cursor read position >> in the topic, and find the ledger which is before the slowest cursor read >> position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or >> `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, >> append it to deleteLedgerList, we also check if the ledger >> `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. >> Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build >> `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I >> will remove the ledger from the ledgerList, then persist the ledgerList. If >> send failed, didn't do anything. >> Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And >> we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And >> persist [2,4,5] to the zk metadata store. >> >> There are some cases to trigger it. >> 1. A cursor be removed. >> 2. Close the current ledger and create a new ledger. >> 3. Consumer ack the message, the slowest cursor move forward. >> 4. User trigger truncateTopic by admin. >> >> Case 4 is related to it, if the topic id not exists, throw >> ResourceDoesNotExistException. > > >2.2 If the target resource's state is pending or running, error >> > out(InvalidResourceStateException). >> If we use the system topic to store the pending deleted ledger, we can't >> know it. Unless we read all the messages to check it, it's not realistic > > > 3. The broker returns success(messageId) to the client. >> ledger deletion may trigger by the broker self. >> Same above reason. >> >> > 4. A system topic consumer(Shared Subscription Mode?) consumes the >> > operation request message. >> Yes. >> > 5. The consumer runs the operation workflow with an x-min timeout. >> > 5.1 The consumer gets the target resource data from the metadata. >> > 5.1.1 If there is no targeted resource, complete the operation. >> Go >> > to 5.4. >> > 5.2 The consumer updates the target resource state to running. >> (state: >> > running, startedAt: now) >> > 5.3 Run the workflow step 1 ... n. >> > 5.4 Acknowledge the message. >> The consumer received message, the message contains the delete ledger >> info. Didn't need to get the target resource from metadata. It send delete >> command to the corresponding broker like produce message. The broker >> received delete command, it check if the ledger is exists in the >> ledgerList, if exists, do nothing. If not exists, delete the bk data or >> offload data. >> If the topic already be delete, when send delete command, it will throw >> TopicNotFoundException, then consumer will delete the ledger locally. >> >> > >> > >> > * I guess we will enable deduplication for the system topic for possibly >> > duplicated operation requests. >> That's not essential. The deletion is idempotent. >> >> > >> > * Here, Target Resource means the last metadata to update in the
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
There are some cases to trigger it. 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. 4. User trigger truncateTopic by admin. I see that this pip is for the internal ledger deletion cases(1-3 above), and it appears that such internal deletion operations do not require pre-validation or status checks(and no additional iops on the metadata store). I agree that we need a separate pip for async admin operations. On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao wrote: > > If we don't want to add more pressure on the metadata store from this > > feature as a requirement, > > I think we can use a system topic as well. Just to confirm, is this the > > direction we are agreeing on? > The design is too generic, I'm afraid not it is not suitable for ledger > deletion. > > > > > Using a system topic, > > 1. A broker receives an async operation request on a target resource > from a > > client. > > 2. If the target resource is available, the broker publishes an operation > > request message to the system topic. > >2.1 If there is no target resource, > > error out(ResourceDoesNotExistException). > It's more generic, I'm afraid it not suitable for ledger deletion. > > In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will > trigger the delete operation. It will find the slowest cursor read position > in the topic, and find the ledger which is before the slowest cursor read > position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or > `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, > append it to deleteLedgerList, we also check if the ledger > `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. > Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build > `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I > will remove the ledger from the ledgerList, then persist the ledgerList. If > send failed, didn't do anything. > Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we > send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And > persist [2,4,5] to the zk metadata store. > > There are some cases to trigger it. > 1. A cursor be removed. > 2. Close the current ledger and create a new ledger. > 3. Consumer ack the message, the slowest cursor move forward. > 4. User trigger truncateTopic by admin. > > Case 4 is related to it, if the topic id not exists, throw > ResourceDoesNotExistException. >2.2 If the target resource's state is pending or running, error > > out(InvalidResourceStateException). > If we use the system topic to store the pending deleted ledger, we can't > know it. Unless we read all the messages to check it, it's not realistic > 3. The broker returns success(messageId) to the client. > ledger deletion may trigger by the broker self. > Same above reason. > > > 4. A system topic consumer(Shared Subscription Mode?) consumes the > > operation request message. > Yes. > > 5. The consumer runs the operation workflow with an x-min timeout. > > 5.1 The consumer gets the target resource data from the metadata. > > 5.1.1 If there is no targeted resource, complete the operation. > Go > > to 5.4. > > 5.2 The consumer updates the target resource state to running. > (state: > > running, startedAt: now) > > 5.3 Run the workflow step 1 ... n. > > 5.4 Acknowledge the message. > The consumer received message, the message contains the delete ledger > info. Didn't need to get the target resource from metadata. It send delete > command to the corresponding broker like produce message. The broker > received delete command, it check if the ledger is exists in the > ledgerList, if exists, do nothing. If not exists, delete the bk data or > offload data. > If the topic already be delete, when send delete command, it will throw > TopicNotFoundException, then consumer will delete the ledger locally. > > > > > > > * I guess we will enable deduplication for the system topic for possibly > > duplicated operation requests. > That's not essential. The deletion is idempotent. > > > > > * Here, Target Resource means the last metadata to update in the metadata > > store. > > For example, a workflow updates multiple metadata places like the > > following. In this case, the target resource is the 3rd one. > > 1. sub-resource-1 > > 2. sub-resource-2 > > 3. target-resource. e.g. ledger, topic metadata. > > > > * For Step 5.3, all steps should be idempotent. Also, the consumer > > periodically updates the operation state in the target resource's > metadata > > every x secs. Any retry on a long-running job should resume the operation > > closer to where it failed instead of running it from the beginning. > > > * We could have describe* apis to check any inflight operations state by > > querying the target resource's operation state, but this will show some > > eventual consistency because
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> If we don't want to add more pressure on the metadata store from this > feature as a requirement, > I think we can use a system topic as well. Just to confirm, is this the > direction we are agreeing on? The design is too generic, I'm afraid not it is not suitable for ledger deletion. > > Using a system topic, > 1. A broker receives an async operation request on a target resource from a > client. > 2. If the target resource is available, the broker publishes an operation > request message to the system topic. >2.1 If there is no target resource, > error out(ResourceDoesNotExistException). It's more generic, I'm afraid it not suitable for ledger deletion. In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will trigger the delete operation. It will find the slowest cursor read position in the topic, and find the ledger which is before the slowest cursor read position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, append it to deleteLedgerList, we also check if the ledger `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I will remove the ledger from the ledgerList, then persist the ledgerList. If send failed, didn't do anything. Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And persist [2,4,5] to the zk metadata store. There are some cases to trigger it. 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. 4. User trigger truncateTopic by admin. Case 4 is related to it, if the topic id not exists, throw ResourceDoesNotExistException. >2.2 If the target resource's state is pending or running, error > out(InvalidResourceStateException). If we use the system topic to store the pending deleted ledger, we can't know it. Unless we read all the messages to check it, it's not realistic. > 3. The broker returns success(messageId) to the client. ledger deletion may trigger by the broker self. Same above reason. > 4. A system topic consumer(Shared Subscription Mode?) consumes the > operation request message. Yes. > 5. The consumer runs the operation workflow with an x-min timeout. > 5.1 The consumer gets the target resource data from the metadata. > 5.1.1 If there is no targeted resource, complete the operation. Go > to 5.4. > 5.2 The consumer updates the target resource state to running. (state: > running, startedAt: now) > 5.3 Run the workflow step 1 ... n. > 5.4 Acknowledge the message. The consumer received message, the message contains the delete ledger info. Didn't need to get the target resource from metadata. It send delete command to the corresponding broker like produce message. The broker received delete command, it check if the ledger is exists in the ledgerList, if exists, do nothing. If not exists, delete the bk data or offload data. If the topic already be delete, when send delete command, it will throw TopicNotFoundException, then consumer will delete the ledger locally. > > > * I guess we will enable deduplication for the system topic for possibly > duplicated operation requests. That's not essential. The deletion is idempotent. > > * Here, Target Resource means the last metadata to update in the metadata > store. > For example, a workflow updates multiple metadata places like the > following. In this case, the target resource is the 3rd one. > 1. sub-resource-1 > 2. sub-resource-2 > 3. target-resource. e.g. ledger, topic metadata. > > * For Step 5.3, all steps should be idempotent. Also, the consumer > periodically updates the operation state in the target resource's metadata > every x secs. Any retry on a long-running job should resume the operation > closer to where it failed instead of running it from the beginning. > * We could have describe* apis to check any inflight operations state by > querying the target resource's operation state, but this will show some > eventual consistency because of the delay between step 3 and step 5. > > > Thanks, > Heesung Thanks for your good idea, but it may not suitable the ledger deletion. And in this pip, I didn't want to operate the ledger metadata too much. But your idea may be a good solution for other cases. Maybe we can use it to draft a new pip.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Heesung, thanks. That's a good point for multi steps works. But I'm afraid it will increase the zk pressure and it also needs to handle some corner cases. I prefer to use a system topic to handle it. On 2023/01/31 19:05:34 Heesung Sohn wrote: > On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao wrote: > > > > - Have we considered a metadata store to persist and dedup deletion > > > requests instead of the system topic? Why is the system topic the better > > > choice than a metadata store for this problem? > > If we use the metadata store to store the middle step ledger, we need to > > operate the metadata store after deletion every time. > > > > > > > And we need a trigger to trigger deletion. In the broker, it may have lots > > of topics, the ledger deletion is also much. Using the metadata store to > > store it may be a bottleneck. > > Using pub/sub is easy to implement, and it is a good trigger to trigger > > deletion. > > > > > We can group the multiple resource deletions to a single record in the > metadata store. Also, we can use the metadata store watcher to trigger the > deletion. > > I can see that a similar transactional operation(using metadata store) can > be done like the following. > > Alternatively, > 1. A broker receives a resource(ledger) deletion request from a client. > 2. If the target resource is available, the broker persists a transaction > lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata > store(state:pending, createdAt:now). > 2.1 If there is no target resource, error > out(ResourceDoesNotExistException). > 2.2 If the lock already exists, error out(OperationInProgressExeception). > 3. The broker returns success to the client. > 4. The transaction watcher(metadata store listener) on the same broker-id > is notified. > 5. The transaction watcher runs the deletion process with an x min timeout. > 5.1 The transaction watcher updates the lock state (state: running, > startedAt: now) > 5.2 Run step 1 ... n (periodically update the lock state and > updatedAt:now every x secs) > 5.3 Delete the lock. > 6. The orphan transaction monitor runs any orphan jobs by retrying step 5. > (If the watcher fails in the middle at step 5, the lock state will be > orphan(state:running and startedAt : > x min)) > 7. The leader monitor(on the leader broker) manages orphan jobs if brokers > are gone or unavailable. > > We can have multiple types of transaction locks(or generic lock) depending > on the operations types. This will reduce the number of locks to > create/update if there are multiple target resources to operate on for a > single transaction. > > - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id > - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers : > {ledger_ids[a,b,c,d], last_deleted_ledger_index:3} > //last_deleted_ledger_index could be periodically updated every min. This > can help to resume the deletion when retrying. > - Topic deletion : /transactions/broker-id/delete_topic/topic_name > > > > > > - How does Pulsar deduplicate deletion requests(error out to users) while > > > the deletion request is running? > > The user only can invoke `truncateTopic`, it's not for a particular > > ledger. The note: "The truncate operation will move all cursors to the end > > of the topic and delete all inactive ledgers." > > It's just a trigger for the user. > > > > What if the admin concurrently requests `truncateTopic` many times for the > same topic while one truncation job is running? How does Pulsar currently > deduplicate these requests? And how does this proposal handle this > situation? > > > > > > > - How do users track async deletion flow status? (do we expose any > > > describeDeletion API to show the deletion status?) > > Why need to track the async deletion flow status? The ledger deletion is > > transparent for pulsarClient. In the broker, deleting a ledger will print > > the log `delete ledger xx successfully `. > > If delete failed, it print the log `delete ledger xxx failed.` > > > > IMHO, relying on logs to check the system state is not a good practice. > Generally, every async user/admin API(long-running async workflow API) > needs the corresponding describe* API to return the current running state. > > > Regards, > Heesung >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> If you persisted the message successfully by the producer and the broker > was terminated before being able to delete the ledger from the metadata > service? If the broker is terminated, the consumer won't ack the message, the message will be re-consume later. > I recommend having the logic to delete the ledger be done in the message > consumer side: > - if the ledger exists in the MD store, delete it. > - send delete command to BK > Both as I understand are idempotent. If for some reason one action was done > but the other wasn't - ZK down, BK down, broker hosting the consumer > terminated - either the message will not be acknowledged, or you negatively > acknowledge. We send a delete command to the broker, it will connect to the corresponding broker which loads the topic. The corresponding broker received the command, then passes the command to ManagedLedger, the ManagedLedger does the actual delete operation. If the consumer does the delete operation, it's a little unreasonable. The ledger manager should be `ManagedLedger`, let it do the delete will be better. > General question: When a ledger is persisted to ZK, where is the ledger > metadata persisted in ZK (more specifically it's metadata, which includes > the component? > Is it also used when building out the key (path) in ZK? https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74 It's the content in the zk node. when creating a ledger by bookkeeper, it will create a path like `/ledgers/00//L`, the path value is an instance of LedgerMetadataImpl. The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's metadata. If the ledger is for Ledger, the customMetadata store: `key:application, value:pulsar` `key:component, value:managed-ledger` `key:pulsar/managed-ledger, value: ledgerName` If the ledger is for Cursor, the customMetadata store: `key:application, value:pulsar` `key:component, value:managed-ledger` `key:pulsar/managed-ledger, value: ledgerName` `key:pulsar/cursor, value: curSorName` If the ledger is for schema, the customMetadata store: `key:application, value:pulsar` `key:component, value:schema` `key:pulsar/schemaId, value: schemaId` So when we get the ledger metadata from bookkeeper, we can get the ledger source. > Isn't the type saved together with the ledger in ZK? We need to differ it, the same ledger may store both on the bk side and the offload side. If a ledger want to delete the bk data and the offload data, it should publishes two message to the system topic. The broker needs it to determine whether to delete offload or bk. > It's for the offloaded ledger, when we want to delete the offload ledger, > > we need offloadedContextUuid, here we can simplify it to offloadContextUuid. > Sounds much better. Maybe offloadedLedgerUUID? (why context?) Agree. > > Are you encoding all that extra info besides the ledger ID and its source > to avoid reading it again from ZK when deleting it? No, only encoding the data which is useful for deletion. > > It's for extended. > > > Can't really understand from that short sentence what you mean. Can you > please elaborate? Maybe we can delete it, I just want to didn't change the class when we want to add new property. Put the new property as key-value to extend. > > > In https://github.com/apache/pulsar/issues/16569. The first step section > > and second step section process flow picture are detailed. > > > I'm sorry but you didn't answer all the questions I wrote. I'll paste them > here: > Can you explain the starting point? How does deletion work in general? > > When? What happens? ... I understand there are time based triggers, and > > sometimes used based triggers. They are somehow marked in metadata. > In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will trigger the delete operation. It will find the slowest cursor read position in the topic, and find the ledger which is before the slowest cursor read position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, append it to deleteLedgerList, we also check if the ledger `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I will remove the ledger from the ledgerList, then persist the ledgerList. If send failed, didn't do anything. Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And persist [2,4,5] to the zk metadata store. There are some cases to trigger it. 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. 4. User trigger
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Thu, Feb 2, 2023 at 6:42 AM Asaf Mesika wrote: > Heesung, there are users out there which delete 2M topics in a day. The > stress we apply on ZK today due to the amount of topics is more than > enough. I don't think we should rely on ZK to also manage the deletion of > those ledgers if that means adding more nodes / more reads and writes to > it. I totally agree that we need to know if there is an in flight request. I > think this can be achieved using a special topic. I mean, why can't we just > mark in ZK itself the ledger is marked for delete, and the workflow will go > through the different stages until all needed data is deleted and only then > delete the ledger ID from ZK? > If we don't want to add more pressure on the metadata store from this feature as a requirement, I think we can use a system topic as well. Just to confirm, is this the direction we are agreeing on? Using a system topic, 1. A broker receives an async operation request on a target resource from a client. 2. If the target resource is available, the broker publishes an operation request message to the system topic. 2.1 If there is no target resource, error out(ResourceDoesNotExistException). 2.2 If the target resource's state is pending or running, error out(InvalidResourceStateException). 3. The broker returns success(messageId) to the client. 4. A system topic consumer(Shared Subscription Mode?) consumes the operation request message. 5. The consumer runs the operation workflow with an x-min timeout. 5.1 The consumer gets the target resource data from the metadata. 5.1.1 If there is no targeted resource, complete the operation. Go to 5.4. 5.2 The consumer updates the target resource state to running. (state: running, startedAt: now) 5.3 Run the workflow step 1 ... n. 5.4 Acknowledge the message. * I guess we will enable deduplication for the system topic for possibly duplicated operation requests. * Here, Target Resource means the last metadata to update in the metadata store. For example, a workflow updates multiple metadata places like the following. In this case, the target resource is the 3rd one. 1. sub-resource-1 2. sub-resource-2 3. target-resource. e.g. ledger, topic metadata. * For Step 5.3, all steps should be idempotent. Also, the consumer periodically updates the operation state in the target resource's metadata every x secs. Any retry on a long-running job should resume the operation closer to where it failed instead of running it from the beginning. * We could have describe* apis to check any inflight operations state by querying the target resource's operation state, but this will show some eventual consistency because of the delay between step 3 and step 5. Thanks, Heesung > > On Tue, Jan 31, 2023 at 9:06 PM Heesung Sohn > wrote: > > > On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao wrote: > > > > > > - Have we considered a metadata store to persist and dedup deletion > > > > requests instead of the system topic? Why is the system topic the > > better > > > > choice than a metadata store for this problem? > > > If we use the metadata store to store the middle step ledger, we need > to > > > operate the metadata store after deletion every time. > > > > > > > > > > > > And we need a trigger to trigger deletion. In the broker, it may have > > lots > > > of topics, the ledger deletion is also much. Using the metadata store > to > > > store it may be a bottleneck. > > > Using pub/sub is easy to implement, and it is a good trigger to trigger > > > deletion. > > > > > > > > > We can group the multiple resource deletions to a single record in the > > metadata store. Also, we can use the metadata store watcher to trigger > the > > deletion. > > > > I can see that a similar transactional operation(using metadata store) > can > > be done like the following. > > > > Alternatively, > > 1. A broker receives a resource(ledger) deletion request from a client. > > 2. If the target resource is available, the broker persists a transaction > > lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata > > store(state:pending, createdAt:now). > > 2.1 If there is no target resource, error > > out(ResourceDoesNotExistException). > > 2.2 If the lock already exists, error > out(OperationInProgressExeception). > > 3. The broker returns success to the client. > > 4. The transaction watcher(metadata store listener) on the same broker-id > > is notified. > > 5. The transaction watcher runs the deletion process with an x min > timeout. > > 5.1 The transaction watcher updates the lock state (state: running, > > startedAt: now) > > 5.2 Run step 1 ... n (periodically update the lock state and > > updatedAt:now every x secs) > > 5.3 Delete the lock. > > 6. The orphan transaction monitor runs any orphan jobs by retrying step > 5. > > (If the watcher fails in the middle at step 5, the lock state will be > > orphan(state:running and startedAt : > x min)) > > 7. The leader
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Heesung, there are users out there which delete 2M topics in a day. The stress we apply on ZK today due to the amount of topics is more than enough. I don't think we should rely on ZK to also manage the deletion of those ledgers if that means adding more nodes / more reads and writes to it. I totally agree that we need to know if there is an in flight request. I think this can be achieved using a special topic. I mean, why can't we just mark in ZK itself the ledger is marked for delete, and the workflow will go through the different stages until all needed data is deleted and only then delete the ledger ID from ZK? On Tue, Jan 31, 2023 at 9:06 PM Heesung Sohn wrote: > On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao wrote: > > > > - Have we considered a metadata store to persist and dedup deletion > > > requests instead of the system topic? Why is the system topic the > better > > > choice than a metadata store for this problem? > > If we use the metadata store to store the middle step ledger, we need to > > operate the metadata store after deletion every time. > > > > > > > And we need a trigger to trigger deletion. In the broker, it may have > lots > > of topics, the ledger deletion is also much. Using the metadata store to > > store it may be a bottleneck. > > Using pub/sub is easy to implement, and it is a good trigger to trigger > > deletion. > > > > > We can group the multiple resource deletions to a single record in the > metadata store. Also, we can use the metadata store watcher to trigger the > deletion. > > I can see that a similar transactional operation(using metadata store) can > be done like the following. > > Alternatively, > 1. A broker receives a resource(ledger) deletion request from a client. > 2. If the target resource is available, the broker persists a transaction > lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata > store(state:pending, createdAt:now). > 2.1 If there is no target resource, error > out(ResourceDoesNotExistException). > 2.2 If the lock already exists, error out(OperationInProgressExeception). > 3. The broker returns success to the client. > 4. The transaction watcher(metadata store listener) on the same broker-id > is notified. > 5. The transaction watcher runs the deletion process with an x min timeout. > 5.1 The transaction watcher updates the lock state (state: running, > startedAt: now) > 5.2 Run step 1 ... n (periodically update the lock state and > updatedAt:now every x secs) > 5.3 Delete the lock. > 6. The orphan transaction monitor runs any orphan jobs by retrying step 5. > (If the watcher fails in the middle at step 5, the lock state will be > orphan(state:running and startedAt : > x min)) > 7. The leader monitor(on the leader broker) manages orphan jobs if brokers > are gone or unavailable. > > We can have multiple types of transaction locks(or generic lock) depending > on the operations types. This will reduce the number of locks to > create/update if there are multiple target resources to operate on for a > single transaction. > > - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id > - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers : > {ledger_ids[a,b,c,d], last_deleted_ledger_index:3} > //last_deleted_ledger_index could be periodically updated every min. This > can help to resume the deletion when retrying. > - Topic deletion : /transactions/broker-id/delete_topic/topic_name > > > > > > - How does Pulsar deduplicate deletion requests(error out to users) > while > > > the deletion request is running? > > The user only can invoke `truncateTopic`, it's not for a particular > > ledger. The note: "The truncate operation will move all cursors to the > end > > of the topic and delete all inactive ledgers." > > It's just a trigger for the user. > > > > What if the admin concurrently requests `truncateTopic` many times for the > same topic while one truncation job is running? How does Pulsar currently > deduplicate these requests? And how does this proposal handle this > situation? > > > > > > > - How do users track async deletion flow status? (do we expose any > > > describeDeletion API to show the deletion status?) > > Why need to track the async deletion flow status? The ledger deletion is > > transparent for pulsarClient. In the broker, deleting a ledger will print > > the log `delete ledger xx successfully `. > > If delete failed, it print the log `delete ledger xxx failed.` > > > > IMHO, relying on logs to check the system state is not a good practice. > Generally, every async user/admin API(long-running async workflow API) > needs the corresponding describe* API to return the current running state. > > > Regards, > Heesung >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Heesung points out really good points, which actually made me add one more: Can you please describe all the triggers for ledger deletion? Some are user facing, some not. On Mon, Jan 30, 2023 at 8:19 PM Heesung Sohn wrote: > Hi, > > > I assume the deletion APIs are async(when a user requests deletion, pulsar > first returns success to the user if the request is persisted. Then, Pulsar > asynchronously runs the deletion flow) > > - Have we considered a metadata store to persist and dedup deletion > requests instead of the system topic? Why is the system topic the better > choice than a metadata store for this problem? > > - How does Pulsar deduplicate deletion requests(error out to users) while > the deletion request is running? > > - How do users track async deletion flow status? (do we expose any > describeDeletion API to show the deletion status?) > > > Thanks, > Heesung > > > > > On Mon, Jan 30, 2023 at 6:10 AM Yan Zhao wrote: > > > > Couples of notes: > > > > > > 1. > > > > > > > In the LedgerDeletionService start, it will create a producer to > send > > > > pending delete ledger. > > > > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay > > (the > > > > delay is for consumer side, if send it immediately, maybe the > metadata > > > > din't change when consumer receive it). After the send operation > > succeeds, > > > > then to operate metadata. If send msg failed, we think this deletion > > > > operation failed, and didn't operate metadata. > > > > > > > > > This section is completely unclear to me. Can you please provide step > by > > > step exactly what will happen in the workflow? > > > Who does what and where (node)? > > > > In PulsarService, it defines ledgerDeletionService. The > > ledgerDeletionService will create a producer and a consumer, which looks > > like topicPoliciesService. The PulsarService passes it to ManagedLedger. > > When pulsar wants to delete a ledger, ManagedLedger uses > > ledgerDeletionService to send a message, the message content contains the > > waiting delete ledger info. After sending success, delete the ledger id > > from the metadata store. > > The consumer receives the message, it will use PulsarClient to send a > > delete command to the corresponding broker, the broker receives delete > > command, and do the actual delete operation. > > > > In https://github.com/apache/pulsar/issues/16569, these are some > pictures > > for the workflow. > > > > > > > > > > 2. > > > > > > > /** > > > > * The ledger component . managed-ledger, managed-cursor and > > > > schema-storage. > > > > */ > > > > private LedgerComponent ledgerComponent; > > > > > > > > > Why is this needed? > > > What do you mean by a component of a ledger? Is the ledger divided into > > > components? > > > > It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE) > > When the broker wants to delete a ledger, we will check if the bookkeeper > > metadata matches or not. In the pulsar, it will mark the ledger source > when > > creating a new ledger. See LedgerMetadataUtils. > > > > > > > > 3. > > > > > > > /** > > > > * The ledger type. ledger or offload-ledger. > > > > */ > > > > private LedgerType ledgerType; > > > > > > > > > I don't understand why you need this type. > > It marks the ledger as a normal ledger or an offload ledger, broker need > > it to determine whether to delete bookkeeper data or offload data. > > > > > > > > 4. > > > > > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > > > > > > > > Context is a very generic word, but your type is so specific. Can you > > > please explain why you need this for? > > > > > > Are you sure you want to tie one data structure into the other - just > > > validating. > > It's for the offloaded ledger, when we want to delete the offload ledger, > > we need offloadedContextUuid, here we can simplify it to > offloadContextUuid. > > > > > > > > 5. > > > > > > > /** > > > > * Extent properties. > > > > */ > > > > private Map properties = new HashMap<>(); > > > > > > > > > Why is this needed? > > It's for extended. > > > > > > > > > > > 6. > > > > > > > When receiving a pending delete ledger msg, we will check if the > topic > > > > still exists. If the topic exists, send a delete command > > > > (PendingDelteLedger) to the broker which owns the topic. In the > > broker, it > > > > will check if the ledger is still in the metadata store, if the > ledger > > in > > > > the metadata store means the ledger is still in use, give up to > delete > > this > > > > ledge > > > > > > > > > I don't understand this workflow. You say you check if it's in the > > metadata > > > store, and if it is , then it is used - what will make it unused? > > > Can you explain the starting point? How does deletion work in general? > > > When? What happens? ... I understand there are time based triggers, and > > > sometimes used based triggers. They are somehow marked in metadata. > > In
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> > When pulsar wants to delete a ledger, ManagedLedger uses > ledgerDeletionService to send a message, the message content contains the > waiting delete ledger info. After sending success, delete the ledger id > from the metadata store. > The consumer receives the message, it will use PulsarClient to send a > delete command to the corresponding broker, the broker receives delete > command, and do the actual delete operation. > If you persisted the message successfully by the producer and the broker was terminated before being able to delete the ledger from the metadata service? I recommend having the logic to delete the ledger be done in the message consumer side: - if the ledger exists in the MD store, delete it. - send delete command to BK Both as I understand are idempotent. If for some reason one action was done but the other wasn't - ZK down, BK down, broker hosting the consumer terminated - either the message will not be acknowledged, or you negatively acknowledge. It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE) > When the broker wants to delete a ledger, we will check if the bookkeeper > metadata matches or not. In the pulsar, it will mark the ledger source when > creating a new ledger. See LedgerMetadataUtils. General question: When a ledger is persisted to ZK, where is the ledger metadata persisted in ZK (more specifically it's metadata, which includes the component? Is it also used when building out the key (path) in ZK? It marks the ledger as a normal ledger or an offload ledger, broker need it > to determine whether to delete bookkeeper data or offload data. > Isn't the type saved together with the ledger in ZK? It's for the offloaded ledger, when we want to delete the offload ledger, > we need offloadedContextUuid, here we can simplify it to offloadContextUuid. Sounds much better. Maybe offloadedLedgerUUID? (why context?) Are you encoding all that extra info besides the ledger ID and its source to avoid reading it again from ZK when deleting it? It's for extended. > Can't really understand from that short sentence what you mean. Can you please elaborate? > In https://github.com/apache/pulsar/issues/16569. The first step section > and second step section process flow picture are detailed. > I'm sorry but you didn't answer all the questions I wrote. I'll paste them here: Can you explain the starting point? How does deletion work in general? > When? What happens? ... I understand there are time based triggers, and > sometimes used based triggers. They are somehow marked in metadata. If delete fails, that means the storage system occur some problems. I guess > the storage system will recovery in 10 mins. > > In https://github.com/apache/pulsar/issues/16569, we define > reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the ServiceConfiguration, > it's configurable. > private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600; We need some experienced people here to contribute their opinion. Default 10min might be too much. I recommend you ask Penghui. > > > You mentioned you are working with a client which has retries configured. > > Retry is client side based, ack one message while producing another, > > transaction free. Are you prepared to handle a case where you acked but > > failed to produce the message, hence you lost it completely? > > > The pulsarClient only sends a new message that succeeds, then ack the > origin message, so didn't care in this case. > Ok, then you will have concurrent consumption of a message which tries to delete the ledger from ZK and then tries to delete it from BK? Isn't that a concurrency issue? > > > If we want to delete a topic, we should send the delete ledger msg to > > > system topic and remove ledger id from metadata one by one, after all > the > > > ledger has been deleted, then delete the topic. If any ledger operate > > > failed, we think this delete topic operation failed and return the left > > > ledger id in the response. > > > > I couldn't understand. Can you please clarify this section. How exactly > > topic deletion is modified to use the features described in this pip? > > > We need to ensure that all ledgers are deleted before the topic is > deleted, otherwise, there will be orphan ledgers. > Your PIP is about introducing a workflow for deleting a ledger, right? When you delete a topic you iterate its ledger list and delete each ledger. Your PIP changes the way each ledger is deleted and makes it async. So I guess what I want to understand is: What are the changes you are making to topic deletion due to your PIP? You said "we need to make sure" - can you please clarify how you will make sure? > 10. > > Backward compatibility - I would make sure to document in the docs > exactly > > what to do in case we need to rollback (cook book). > Well. You added > If user upgrade and enable two phase deletion, the ledger deletion msg > will store in system topic. If the user rolls back to the old version
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao wrote: > > - Have we considered a metadata store to persist and dedup deletion > > requests instead of the system topic? Why is the system topic the better > > choice than a metadata store for this problem? > If we use the metadata store to store the middle step ledger, we need to > operate the metadata store after deletion every time. > > And we need a trigger to trigger deletion. In the broker, it may have lots > of topics, the ledger deletion is also much. Using the metadata store to > store it may be a bottleneck. > Using pub/sub is easy to implement, and it is a good trigger to trigger > deletion. > We can group the multiple resource deletions to a single record in the metadata store. Also, we can use the metadata store watcher to trigger the deletion. I can see that a similar transactional operation(using metadata store) can be done like the following. Alternatively, 1. A broker receives a resource(ledger) deletion request from a client. 2. If the target resource is available, the broker persists a transaction lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata store(state:pending, createdAt:now). 2.1 If there is no target resource, error out(ResourceDoesNotExistException). 2.2 If the lock already exists, error out(OperationInProgressExeception). 3. The broker returns success to the client. 4. The transaction watcher(metadata store listener) on the same broker-id is notified. 5. The transaction watcher runs the deletion process with an x min timeout. 5.1 The transaction watcher updates the lock state (state: running, startedAt: now) 5.2 Run step 1 ... n (periodically update the lock state and updatedAt:now every x secs) 5.3 Delete the lock. 6. The orphan transaction monitor runs any orphan jobs by retrying step 5. (If the watcher fails in the middle at step 5, the lock state will be orphan(state:running and startedAt : > x min)) 7. The leader monitor(on the leader broker) manages orphan jobs if brokers are gone or unavailable. We can have multiple types of transaction locks(or generic lock) depending on the operations types. This will reduce the number of locks to create/update if there are multiple target resources to operate on for a single transaction. - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers : {ledger_ids[a,b,c,d], last_deleted_ledger_index:3} //last_deleted_ledger_index could be periodically updated every min. This can help to resume the deletion when retrying. - Topic deletion : /transactions/broker-id/delete_topic/topic_name > > - How does Pulsar deduplicate deletion requests(error out to users) while > > the deletion request is running? > The user only can invoke `truncateTopic`, it's not for a particular > ledger. The note: "The truncate operation will move all cursors to the end > of the topic and delete all inactive ledgers." > It's just a trigger for the user. > What if the admin concurrently requests `truncateTopic` many times for the same topic while one truncation job is running? How does Pulsar currently deduplicate these requests? And how does this proposal handle this situation? > > > - How do users track async deletion flow status? (do we expose any > > describeDeletion API to show the deletion status?) > Why need to track the async deletion flow status? The ledger deletion is > transparent for pulsarClient. In the broker, deleting a ledger will print > the log `delete ledger xx successfully `. > If delete failed, it print the log `delete ledger xxx failed.` > IMHO, relying on logs to check the system state is not a good practice. Generally, every async user/admin API(long-running async workflow API) needs the corresponding describe* API to return the current running state. Regards, Heesung
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> - Have we considered a metadata store to persist and dedup deletion > requests instead of the system topic? Why is the system topic the better > choice than a metadata store for this problem? If we use the metadata store to store the middle step ledger, we need to operate the metadata store after deletion every time. And we need a trigger to trigger deletion. In the broker, it may have lots of topics, the ledger deletion is also much. Using the metadata store to store it may be a bottleneck. Using pub/sub is easy to implement, and it is a good trigger to trigger deletion. > - How does Pulsar deduplicate deletion requests(error out to users) while > the deletion request is running? The user only can invoke `truncateTopic`, it's not for a particular ledger. The note: "The truncate operation will move all cursors to the end of the topic and delete all inactive ledgers." It's just a trigger for the user. > - How do users track async deletion flow status? (do we expose any > describeDeletion API to show the deletion status?) Why need to track the async deletion flow status? The ledger deletion is transparent for pulsarClient. In the broker, deleting a ledger will print the log `delete ledger xx successfully `. If delete failed, it print the log `delete ledger xxx failed.`
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Hi, I assume the deletion APIs are async(when a user requests deletion, pulsar first returns success to the user if the request is persisted. Then, Pulsar asynchronously runs the deletion flow) - Have we considered a metadata store to persist and dedup deletion requests instead of the system topic? Why is the system topic the better choice than a metadata store for this problem? - How does Pulsar deduplicate deletion requests(error out to users) while the deletion request is running? - How do users track async deletion flow status? (do we expose any describeDeletion API to show the deletion status?) Thanks, Heesung On Mon, Jan 30, 2023 at 6:10 AM Yan Zhao wrote: > > Couples of notes: > > > > 1. > > > > > In the LedgerDeletionService start, it will create a producer to send > > > pending delete ledger. > > > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay > (the > > > delay is for consumer side, if send it immediately, maybe the metadata > > > din't change when consumer receive it). After the send operation > succeeds, > > > then to operate metadata. If send msg failed, we think this deletion > > > operation failed, and didn't operate metadata. > > > > > > This section is completely unclear to me. Can you please provide step by > > step exactly what will happen in the workflow? > > Who does what and where (node)? > > In PulsarService, it defines ledgerDeletionService. The > ledgerDeletionService will create a producer and a consumer, which looks > like topicPoliciesService. The PulsarService passes it to ManagedLedger. > When pulsar wants to delete a ledger, ManagedLedger uses > ledgerDeletionService to send a message, the message content contains the > waiting delete ledger info. After sending success, delete the ledger id > from the metadata store. > The consumer receives the message, it will use PulsarClient to send a > delete command to the corresponding broker, the broker receives delete > command, and do the actual delete operation. > > In https://github.com/apache/pulsar/issues/16569, these are some pictures > for the workflow. > > > > > > 2. > > > > > /** > > > * The ledger component . managed-ledger, managed-cursor and > > > schema-storage. > > > */ > > > private LedgerComponent ledgerComponent; > > > > > > Why is this needed? > > What do you mean by a component of a ledger? Is the ledger divided into > > components? > > It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE) > When the broker wants to delete a ledger, we will check if the bookkeeper > metadata matches or not. In the pulsar, it will mark the ledger source when > creating a new ledger. See LedgerMetadataUtils. > > > > > 3. > > > > > /** > > > * The ledger type. ledger or offload-ledger. > > > */ > > > private LedgerType ledgerType; > > > > > > I don't understand why you need this type. > It marks the ledger as a normal ledger or an offload ledger, broker need > it to determine whether to delete bookkeeper data or offload data. > > > > > 4. > > > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > > > > > Context is a very generic word, but your type is so specific. Can you > > please explain why you need this for? > > > > Are you sure you want to tie one data structure into the other - just > > validating. > It's for the offloaded ledger, when we want to delete the offload ledger, > we need offloadedContextUuid, here we can simplify it to offloadContextUuid. > > > > > 5. > > > > > /** > > > * Extent properties. > > > */ > > > private Map properties = new HashMap<>(); > > > > > > Why is this needed? > It's for extended. > > > > > > > 6. > > > > > When receiving a pending delete ledger msg, we will check if the topic > > > still exists. If the topic exists, send a delete command > > > (PendingDelteLedger) to the broker which owns the topic. In the > broker, it > > > will check if the ledger is still in the metadata store, if the ledger > in > > > the metadata store means the ledger is still in use, give up to delete > this > > > ledge > > > > > > I don't understand this workflow. You say you check if it's in the > metadata > > store, and if it is , then it is used - what will make it unused? > > Can you explain the starting point? How does deletion work in general? > > When? What happens? ... I understand there are time based triggers, and > > sometimes used based triggers. They are somehow marked in metadata. > In https://github.com/apache/pulsar/issues/16569. The first step section > and second step section process flow picture are detailed. > > > > > 7. > > > > If we delete successfully, the consumer will ack this msg. If delete > fails, > > > reconsume this msg 10 min later. > > > > > > Where did you define 10min? > > Why 10 min? > If delete fails, that means the storage system occur some problems. I > guess the storage system will recovery in 10 mins. > > In https://github.com/apache/pulsar/issues/16569, we define >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> Couples of notes: > > 1. > > > In the LedgerDeletionService start, it will create a producer to send > > pending delete ledger. > > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the > > delay is for consumer side, if send it immediately, maybe the metadata > > din't change when consumer receive it). After the send operation succeeds, > > then to operate metadata. If send msg failed, we think this deletion > > operation failed, and didn't operate metadata. > > > This section is completely unclear to me. Can you please provide step by > step exactly what will happen in the workflow? > Who does what and where (node)? In PulsarService, it defines ledgerDeletionService. The ledgerDeletionService will create a producer and a consumer, which looks like topicPoliciesService. The PulsarService passes it to ManagedLedger. When pulsar wants to delete a ledger, ManagedLedger uses ledgerDeletionService to send a message, the message content contains the waiting delete ledger info. After sending success, delete the ledger id from the metadata store. The consumer receives the message, it will use PulsarClient to send a delete command to the corresponding broker, the broker receives delete command, and do the actual delete operation. In https://github.com/apache/pulsar/issues/16569, these are some pictures for the workflow. > > 2. > > > /** > > * The ledger component . managed-ledger, managed-cursor and > > schema-storage. > > */ > > private LedgerComponent ledgerComponent; > > > Why is this needed? > What do you mean by a component of a ledger? Is the ledger divided into > components? It's the ledger source, (MANAGED_LEDGER,MANAGED_CURSOR,SCHEMA_STORAGE) When the broker wants to delete a ledger, we will check if the bookkeeper metadata matches or not. In the pulsar, it will mark the ledger source when creating a new ledger. See LedgerMetadataUtils. > > 3. > > > /** > > * The ledger type. ledger or offload-ledger. > > */ > > private LedgerType ledgerType; > > > I don't understand why you need this type. It marks the ledger as a normal ledger or an offload ledger, broker need it to determine whether to delete bookkeeper data or offload data. > > 4. > > > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > > Context is a very generic word, but your type is so specific. Can you > please explain why you need this for? > > Are you sure you want to tie one data structure into the other - just > validating. It's for the offloaded ledger, when we want to delete the offload ledger, we need offloadedContextUuid, here we can simplify it to offloadContextUuid. > > 5. > > > /** > > * Extent properties. > > */ > > private Map properties = new HashMap<>(); > > > Why is this needed? It's for extended. > > > 6. > > > When receiving a pending delete ledger msg, we will check if the topic > > still exists. If the topic exists, send a delete command > > (PendingDelteLedger) to the broker which owns the topic. In the broker, it > > will check if the ledger is still in the metadata store, if the ledger in > > the metadata store means the ledger is still in use, give up to delete this > > ledge > > > I don't understand this workflow. You say you check if it's in the metadata > store, and if it is , then it is used - what will make it unused? > Can you explain the starting point? How does deletion work in general? > When? What happens? ... I understand there are time based triggers, and > sometimes used based triggers. They are somehow marked in metadata. In https://github.com/apache/pulsar/issues/16569. The first step section and second step section process flow picture are detailed. > > 7. > > If we delete successfully, the consumer will ack this msg. If delete fails, > > reconsume this msg 10 min later. > > > Where did you define 10min? > Why 10 min? If delete fails, that means the storage system occur some problems. I guess the storage system will recovery in 10 mins. In https://github.com/apache/pulsar/issues/16569, we define reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the ServiceConfiguration, it's configurable. private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600; > > You mentioned you are working with a client which has retries configured. > Retry is client side based, ack one message while producing another, > transaction free. Are you prepared to handle a case where you acked but > failed to produce the message, hence you lost it completely? > The pulsarClient only sends a new message that succeeds, then ack the origin message, so didn't care in this case. > 8. > > > If we want to delete a topic, we should send the delete ledger msg to > > system topic and remove ledger id from metadata one by one, after all the > > ledger has been deleted, then delete the topic. If any ledger operate > > failed, we think this delete topic operation failed and return the left > >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Couples of notes: 1. > In the LedgerDeletionService start, it will create a producer to send > pending delete ledger. > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the > delay is for consumer side, if send it immediately, maybe the metadata > din't change when consumer receive it). After the send operation succeeds, > then to operate metadata. If send msg failed, we think this deletion > operation failed, and didn't operate metadata. This section is completely unclear to me. Can you please provide step by step exactly what will happen in the workflow? Who does what and where (node)? 2. > /** > * The ledger component . managed-ledger, managed-cursor and > schema-storage. > */ > private LedgerComponent ledgerComponent; Why is this needed? What do you mean by a component of a ledger? Is the ledger divided into components? 3. > /** > * The ledger type. ledger or offload-ledger. > */ > private LedgerType ledgerType; I don't understand why you need this type. 4. > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; Context is a very generic word, but your type is so specific. Can you please explain why you need this for? Are you sure you want to tie one data structure into the other - just validating. 5. > /** > * Extent properties. > */ > private Map properties = new HashMap<>(); Why is this needed? 6. > When receiving a pending delete ledger msg, we will check if the topic > still exists. If the topic exists, send a delete command > (PendingDelteLedger) to the broker which owns the topic. In the broker, it > will check if the ledger is still in the metadata store, if the ledger in > the metadata store means the ledger is still in use, give up to delete this > ledge I don't understand this workflow. You say you check if it's in the metadata store, and if it is , then it is used - what will make it unused? Can you explain the starting point? How does deletion work in general? When? What happens? ... I understand there are time based triggers, and sometimes used based triggers. They are somehow marked in metadata. 7. If we delete successfully, the consumer will ack this msg. If delete fails, > reconsume this msg 10 min later. Where did you define 10min? Why 10 min? You mentioned you are working with a client which has retries configured. Retry is client side based, ack one message while producing another, transaction free. Are you prepared to handle a case where you acked but failed to produce the message, hence you lost it completely? 8. > If we want to delete a topic, we should send the delete ledger msg to > system topic and remove ledger id from metadata one by one, after all the > ledger has been deleted, then delete the topic. If any ledger operate > failed, we think this delete topic operation failed and return the left > ledger id in the response. I couldn't understand. Can you please clarify this section. How exactly topic deletion is modified to use the features described in this pip? 9. Regarding configuration. I suggest we prefix all config keys with the feature name so we can easily separate them. 10. Backward compatibility - I would make sure to document in the docs exactly what to do in case we need to rollback (cook book). 11. General comment - You're basically implementing a bespoke workflow using a topic to save the state of where you are in the topic. Is this the only place in Pulsar (delete ledger) that an action is composed of several steps ? If the answer is no to this, wouldn't it be better to have a small utility which is in charge of moving through the workflow steps? It can even be a simple state enum, where you move your state from a to b to c to d and it is persisted. 12. Monitoring Some actions here can take a long time. We're basically relying on logs to monitor where we are in the flow? On Sun, Jan 29, 2023 at 4:44 AM Yan Zhao wrote: > We need more eyes and votes. Thanks. >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
We need more eyes and votes. Thanks.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Good idea. > Good idea. I suggest the naming endings with `Seconds` like > sendDelayOfTwoPhaseDeletionInSeconds,` > reconsumeLaterOfTwoPhaseDeletionInSeconds`. > >private int sendDelaySecondsOfTwoPhaseDeletion = 60; > >private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Good idea. I suggest the naming endings with `Seconds` like sendDelayOfTwoPhaseDeletionInSeconds,` reconsumeLaterOfTwoPhaseDeletionInSeconds`. >private int sendDelaySecondsOfTwoPhaseDeletion = 60; >private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600; Yan Zhao 于2022年8月12日周五 10:45写道: > > I suggest to include 'topic' in the flag, we have too many entities in > > Pulsar > > Thanks, change it to `topicTwoPhaseDeletionEnabled`. > -- BR, Qiang Huang
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
> I suggest to include 'topic' in the flag, we have too many entities in > Pulsar Thanks, change it to `topicTwoPhaseDeletionEnabled`.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Il Sab 23 Lug 2022, 03:26 Yan Zhao ha scritto: > Yes, wo define `twoPhaseDeletionEnabled` in the Service Configuration. > Thanks. I suggest to include 'topic' in the flag, we have too many entities in Pulsar Enrico >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Yes, wo define `twoPhaseDeletionEnabled` in the Service Configuration.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Il Mer 20 Lug 2022, 18:05 Yan Zhao ha scritto: > Hi, Enrico. If we bind the the topic with per-tenant, when tenant be > deleted or the tenant not be load anymore, we data in the tennat system > topic can't be consumed before the tenant next load. > This is a good point. So let's go with the system topic. Thanks Are we adding a configuratio flag? Enrico > On 2022/07/14 15:35:16 Enrico Olivelli wrote: > > This is very interesting. > > > > I have only one concern. > > I think that we should at least use a per-tenant system topic, or, > > better, per-namespace. > > There is no need to create the deletion topic if there is nothing to > delete. > > > > I am used to dealing with Pulsar clusters in which Tenants are > > strictly isolated. > > Introducing another component that is not tenant aware it kind of a > > problem (we already have such problem with the Transaction > > Coordinator) > > > > Enrico >
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Hi, Enrico. If we bind the the topic with per-tenant, when tenant be deleted or the tenant not be load anymore, we data in the tennat system topic can't be consumed before the tenant next load. On 2022/07/14 15:35:16 Enrico Olivelli wrote: > This is very interesting. > > I have only one concern. > I think that we should at least use a per-tenant system topic, or, > better, per-namespace. > There is no need to create the deletion topic if there is nothing to delete. > > I am used to dealing with Pulsar clusters in which Tenants are > strictly isolated. > Introducing another component that is not tenant aware it kind of a > problem (we already have such problem with the Transaction > Coordinator) > > Enrico
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
If we use per-tenant system topic or per-namespace. and there is no active topic in the tenant anymore, how we consume the msg in this tenant. This is the concern I cared about, so I prefer to use system topic.
Re: [DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
This is very interesting. I have only one concern. I think that we should at least use a per-tenant system topic, or, better, per-namespace. There is no need to create the deletion topic if there is nothing to delete. I am used to dealing with Pulsar clusters in which Tenants are strictly isolated. Introducing another component that is not tenant aware it kind of a problem (we already have such problem with the Transaction Coordinator) Enrico Il giorno mer 13 lug 2022 alle ore 10:54 horizonzy ha scritto: > > Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion > protocol based on system topic" Proposal Link: > https://github.com/apache/pulsar/issues/16569 > > --- > > ## Motivation > Original issue: #13238 > In current ledger deletion, we divided it into two separate steps. It > happens in ManagedLedger and ManagedCursor. > Remove all the waiting to delete ledgers from the ledger list and update > the newest ledger list into a meta store. > In the meta store update callback operation, delete the waiting to delete > ledgers from storage systems, such as BookKeeper or Tiered storage. > > Due to the separate step, we can’t ensure the ledger deletion transaction. > If the first step succeeds and the second step fails, it will lead to > ledgers that can't be deleted from the storage system forever. The second > step may fail by broker restart or storage system deletion failed. > > In our customer’s environment, we have found many orphan ledgers cause by > the above reason. > ## Design > Based on the above, we Introduce LedgerDeletionService to support two phase > deletion. We hope it provides a general solution for two phase deletion. It > will cover the problem we already found in managed-ledger, managed-cursor > and schema-storage. > > In this design, we use the system topic to help us to store the pending > delete ledger. > * pulsar/system/persistent/__ledger_deletion : store the pending delete > ledger > * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above > > > ### The first phase: > ``` > client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class)) > .topic("pulsar/system/persistent/__ledger_deletion") > .enableBatching(false) > .createAsync(); > ``` > In the LedgerDeletionService start, it will create a producer to send > pending delete ledger. > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the > delay is for consumer side, if send it immediately, maybe the metadata > din't change when consumer receive it). After the send operation succeeds, > then to operate metadata. If send msg failed, we think this deletion > operation failed, and didn't operate metadata. > > **PendingDeleteLedgerInfo** > ``` > public class PendingDeleteLedgerInfo { > /** > * Partitioned topic name without domain. Likes > public/default/test-topic-partition-1 or > * public/default/test-topic > */ > private String topicName; > > /** > * The ledger component . managed-ledger, managed-cursor and > schema-storage. > */ > private LedgerComponent ledgerComponent; > > /** > * The ledger type. ledger or offload-ledger. > */ > private LedgerType ledgerType; > > /** > * LedgerId. > */ > private Long ledgerId; > > /** > * Context, holds offload info. If bk ledger, the context is null. > */ > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > /** > * When a consumer receives a pending delete ledger, maybe the ledger > is still in use, we need to check the ledger is in use. > * In some cases, we needn't check the ledger still in use. > */ > private boolean checkLedgerStillInUse; > > /** > * Extent properties. > */ > private Map properties = new HashMap<>(); > } > ``` > ### The second phase > ``` > client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class)) > .topic("pulsar/system/persistent/__ledger_deletion") > .subscriptionName("ledger-deletion-worker") > .subscriptionType(SubscriptionType.Shared) > .deadLetterPolicy(DeadLetterPolicy.builder() > > > .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName()) > .maxRedeliverCount(10).build()) > .subscribeAsync() > ``` > In the LedgerDeletionService start, it will start a consumer to consume > pending delete ledger. > > ### Check if the ledger is still in use > When received a pending delete ledger, we should check if the pending > delete ledger still exists in the metadata. If exists, we should give up to > delete this ledger in this time, the consumer won't ack this message, do > reconsumeLater 10 min. If it does not exist, it will try to delete legder > or offload-ledger according to the ledger-type. If delete succeeds, ack > this message, if delete failed, reconsumerLater 10 min. If a > PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ > pulsar/system/persistent/__ledger_deletion_archive > >
[DISCUSS] PIP-186: Introduce two phase deletion protocol based on system topic
Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion protocol based on system topic" Proposal Link: https://github.com/apache/pulsar/issues/16569 --- ## Motivation Original issue: #13238 In current ledger deletion, we divided it into two separate steps. It happens in ManagedLedger and ManagedCursor. Remove all the waiting to delete ledgers from the ledger list and update the newest ledger list into a meta store. In the meta store update callback operation, delete the waiting to delete ledgers from storage systems, such as BookKeeper or Tiered storage. Due to the separate step, we can’t ensure the ledger deletion transaction. If the first step succeeds and the second step fails, it will lead to ledgers that can't be deleted from the storage system forever. The second step may fail by broker restart or storage system deletion failed. In our customer’s environment, we have found many orphan ledgers cause by the above reason. ## Design Based on the above, we Introduce LedgerDeletionService to support two phase deletion. We hope it provides a general solution for two phase deletion. It will cover the problem we already found in managed-ledger, managed-cursor and schema-storage. In this design, we use the system topic to help us to store the pending delete ledger. * pulsar/system/persistent/__ledger_deletion : store the pending delete ledger * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above ### The first phase: ``` client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class)) .topic("pulsar/system/persistent/__ledger_deletion") .enableBatching(false) .createAsync(); ``` In the LedgerDeletionService start, it will create a producer to send pending delete ledger. When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the delay is for consumer side, if send it immediately, maybe the metadata din't change when consumer receive it). After the send operation succeeds, then to operate metadata. If send msg failed, we think this deletion operation failed, and didn't operate metadata. **PendingDeleteLedgerInfo** ``` public class PendingDeleteLedgerInfo { /** * Partitioned topic name without domain. Likes public/default/test-topic-partition-1 or * public/default/test-topic */ private String topicName; /** * The ledger component . managed-ledger, managed-cursor and schema-storage. */ private LedgerComponent ledgerComponent; /** * The ledger type. ledger or offload-ledger. */ private LedgerType ledgerType; /** * LedgerId. */ private Long ledgerId; /** * Context, holds offload info. If bk ledger, the context is null. */ private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; /** * When a consumer receives a pending delete ledger, maybe the ledger is still in use, we need to check the ledger is in use. * In some cases, we needn't check the ledger still in use. */ private boolean checkLedgerStillInUse; /** * Extent properties. */ private Map properties = new HashMap<>(); } ``` ### The second phase ``` client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class)) .topic("pulsar/system/persistent/__ledger_deletion") .subscriptionName("ledger-deletion-worker") .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName()) .maxRedeliverCount(10).build()) .subscribeAsync() ``` In the LedgerDeletionService start, it will start a consumer to consume pending delete ledger. ### Check if the ledger is still in use When received a pending delete ledger, we should check if the pending delete ledger still exists in the metadata. If exists, we should give up to delete this ledger in this time, the consumer won't ack this message, do reconsumeLater 10 min. If it does not exist, it will try to delete legder or offload-ledger according to the ledger-type. If delete succeeds, ack this message, if delete failed, reconsumerLater 10 min. If a PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ pulsar/system/persistent/__ledger_deletion_archive _Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If the storage system shuts down, the pending delete ledger will try to delete 10 times in 100 min. So we don't worry if the storage system shutdown, the ledger can't be delete._ ### How to get existing ledger when consumer receives pending delete ledger Now we supply admin api topics getInternalStats {topic-name}, the response contains three part info we want. * managed-ledger ledgerIds * managed-cursor ledgerIds * schema-storage ledgerIds So when receiving a pending delete ledger, we will fetch topic internal stats. To avoid fetching topic internal stats too frequently, we define ledgersCache for them. ``` private final LoadingCache> ledgersCache =