For these internal requesters,
1. A cursor be removed.
2. Close the current ledger and create a new ledger.
3. Consumer ack the message, the slowest cursor move forward.

How do we prevent these callers from requesting duplicate requests for the
same resources(ledgers) (how do we handle racing conditions between the
requesters and consumers? )

It seems like there could be duplicate requests. Are we relying on these
methods to dedup messages?
1. the idempotency of the deletion operations
2. enough delay in the requesters' scan cycles
3. enable dedup in the system topic


On Fri, Feb 3, 2023 at 10:14 AM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

>
>
> There are some cases to trigger it.
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 4. User trigger truncateTopic by admin.
>
> I see that this pip is for the internal ledger deletion cases(1-3 above),
> and it appears that such internal deletion operations do not require
> pre-validation or status checks(and no additional iops on the metadata
> store). I agree that we need a separate pip for async admin operations.
>
>
> On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao <horizo...@apache.org> wrote:
>
>> > If we don't want to add more pressure on the metadata store from this
>> > feature as a requirement,
>> > I think we can use a system topic as well. Just to confirm, is this the
>> > direction we are agreeing on?
>> The design is too generic, I'm afraid not it is not suitable for ledger
>> deletion.
>>
>> >
>> > Using a system topic,
>> > 1. A broker receives an async operation request on a target resource
>> from a
>> > client.
>> > 2. If the target resource is available, the broker publishes an
>> operation
>> > request message to the system topic.
>> >    2.1 If there is no target resource,
>> > error out(ResourceDoesNotExistException).
>> It's more generic, I'm afraid it not suitable for ledger deletion.
>>
>> In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will
>> trigger the delete operation. It will find the slowest cursor read position
>> in the topic, and find the ledger which is before the slowest cursor read
>> position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or
>> `hasLedgerRetentionExpired`. If so, we think the ledger should be delete,
>> append it to deleteLedgerList, we also check if the ledger
>> `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
>> Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build
>> `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I
>> will remove the ledger from the ledgerList, then persist the ledgerList. If
>> send failed, didn't do anything.
>> Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And
>> we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And
>> persist [2,4,5] to the zk metadata store.
>>
>> There are some cases to trigger it.
>> 1. A cursor be removed.
>> 2. Close the current ledger and create a new ledger.
>> 3. Consumer ack the message, the slowest cursor move forward.
>> 4. User trigger truncateTopic by admin.
>>
>> Case 4 is related to it, if the topic id not exists, throw
>> ResourceDoesNotExistException.
>
> >    2.2 If the target resource's state is pending or running, error
>> > out(InvalidResourceStateException).
>> If we use the system topic to store the pending deleted ledger, we can't
>> know it. Unless we read all the messages to check it, it's not realistic
>
> > 3. The broker returns success(messageId) to the client.
>>  ledger deletion may trigger by the broker self.
>> Same above reason.
>>
>> > 4. A system topic consumer(Shared Subscription Mode?) consumes the
>> > operation request message.
>> Yes.
>> > 5. The consumer runs the operation workflow with an x-min timeout.
>> >      5.1 The consumer gets the target resource data from the metadata.
>> >         5.1.1 If there is no targeted resource, complete the operation.
>> Go
>> > to 5.4.
>> >      5.2 The consumer updates the target resource state to running.
>> (state:
>> > running, startedAt: now)
>> >      5.3 Run the workflow step 1 ... n.
>> >      5.4 Acknowledge the message.
>> The consumer received message, the message contains the delete ledger
>> info. Didn't need to get the target resource from metadata. It send delete
>> command to the corresponding broker like produce message. The broker
>> received delete command, it check if the ledger is exists in the
>> ledgerList, if exists, do nothing. If not exists, delete the bk data or
>> offload data.
>> If the topic already be delete, when send delete command, it will throw
>> TopicNotFoundException, then consumer will delete the ledger locally.
>>
>> >
>> >
>> > * I guess we will enable deduplication for the system topic for possibly
>> > duplicated operation requests.
>> That's not essential. The deletion is idempotent.
>>
>> >
>> > * Here, Target Resource means the last metadata to update in the
>> metadata
>> > store.
>> > For example, a workflow updates multiple metadata places like the
>> > following. In this case, the target resource is the 3rd one.
>> > 1. sub-resource-1
>> > 2. sub-resource-2
>> > 3. target-resource. e.g. ledger, topic metadata.
>> >
>> > * For Step 5.3, all steps should be idempotent. Also, the consumer
>> > periodically updates the operation state in the target resource's
>> metadata
>> > every x secs. Any retry on a long-running job should resume the
>> operation
>> > closer to where it failed instead of running it from the beginning.
>>
>> > * We could have describe* apis to check any inflight operations state by
>> > querying the target resource's operation state, but this will show some
>> > eventual consistency because of the delay between step 3 and step 5.
>> >
>> >
>> > Thanks,
>> > Heesung
>>
>> Thanks for your good idea, but it may not suitable the ledger deletion.
>> And in this pip, I didn't want to operate the ledger metadata too much.
>> But your idea may be a good solution for other cases. Maybe we can use it
>> to draft a new pip.
>>
>

Reply via email to