On Thu, Feb 2, 2023 at 6:42 AM Asaf Mesika <asaf.mes...@gmail.com> wrote:

> Heesung, there are users out there which delete 2M topics in a day. The
> stress we apply on ZK today due to the amount of topics is more than
> enough. I don't think we should rely on ZK to also manage the deletion of
> those ledgers if that means adding more nodes / more reads and writes to
> it.

I totally agree that we need to know if there is an in flight request. I
> think this can be achieved using a special topic. I mean, why can't we just
> mark in ZK itself the ledger is marked for delete, and the workflow will go
> through the different stages until all needed data is deleted and only then
> delete the ledger ID from ZK?
>

If we don't want to add more pressure on the metadata store from this
feature as a requirement,
I think we can use a system topic as well. Just to confirm, is this the
direction we are agreeing on?

Using a system topic,
1. A broker receives an async operation request on a target resource from a
client.
2. If the target resource is available, the broker publishes an operation
request message to the system topic.
   2.1 If there is no target resource,
error out(ResourceDoesNotExistException).
   2.2 If the target resource's state is pending or running, error
out(InvalidResourceStateException).
3. The broker returns success(messageId) to the client.
4. A system topic consumer(Shared Subscription Mode?) consumes the
operation request message.
5. The consumer runs the operation workflow with an x-min timeout.
     5.1 The consumer gets the target resource data from the metadata.
        5.1.1 If there is no targeted resource, complete the operation. Go
to 5.4.
     5.2 The consumer updates the target resource state to running. (state:
running, startedAt: now)
     5.3 Run the workflow step 1 ... n.
     5.4 Acknowledge the message.


* I guess we will enable deduplication for the system topic for possibly
duplicated operation requests.

* Here, Target Resource means the last metadata to update in the metadata
store.
For example, a workflow updates multiple metadata places like the
following. In this case, the target resource is the 3rd one.
1. sub-resource-1
2. sub-resource-2
3. target-resource. e.g. ledger, topic metadata.

* For Step 5.3, all steps should be idempotent. Also, the consumer
periodically updates the operation state in the target resource's metadata
every x secs. Any retry on a long-running job should resume the operation
closer to where it failed instead of running it from the beginning.

* We could have describe* apis to check any inflight operations state by
querying the target resource's operation state, but this will show some
eventual consistency because of the delay between step 3 and step 5.


Thanks,
Heesung




>
> On Tue, Jan 31, 2023 at 9:06 PM Heesung Sohn
> <heesung.s...@streamnative.io.invalid> wrote:
>
> > On Tue, Jan 31, 2023 at 6:43 AM Yan Zhao <horizo...@apache.org> wrote:
> >
> > > > - Have we considered a metadata store to persist and dedup deletion
> > > > requests instead of the system topic? Why is the system topic the
> > better
> > > > choice than a metadata store for this problem?
> > > If we use the metadata store to store the middle step ledger, we need
> to
> > > operate the metadata store after deletion every time.
> >
> >
> >
> > >
> > > And we need a trigger to trigger deletion. In the broker, it may have
> > lots
> > > of topics, the ledger deletion is also much. Using the metadata store
> to
> > > store it may be a bottleneck.
> > > Using pub/sub is easy to implement, and it is a good trigger to trigger
> > > deletion.
> > >
> >
> >
> > We can group the multiple resource deletions to a single record in the
> > metadata store. Also, we can use the metadata store watcher to trigger
> the
> > deletion.
> >
> > I can see that a similar transactional operation(using metadata store)
> can
> > be done like the following.
> >
> > Alternatively,
> > 1. A broker receives a resource(ledger) deletion request from a client.
> > 2. If the target resource is available, the broker persists a transaction
> > lock(/transactions/broker-id/delete_ledger/ledger_id) into a metadata
> > store(state:pending, createdAt:now).
> >   2.1 If there is no target resource, error
> > out(ResourceDoesNotExistException).
> >   2.2 If the lock already exists, error
> out(OperationInProgressExeception).
> > 3. The broker returns success to the client.
> > 4. The transaction watcher(metadata store listener) on the same broker-id
> > is notified.
> > 5. The transaction watcher runs the deletion process with an x min
> timeout.
> >     5.1 The transaction watcher updates the lock state (state: running,
> > startedAt: now)
> >     5.2 Run step 1 ... n (periodically update the lock state and
> > updatedAt:now every x secs)
> >     5.3 Delete the lock.
> > 6. The orphan transaction monitor runs any orphan jobs by retrying step
> 5.
> > (If the watcher fails in the middle at step 5, the lock state will be
> > orphan(state:running and startedAt :  > x min))
> > 7. The leader monitor(on the leader broker) manages orphan jobs if
> brokers
> > are gone or unavailable.
> >
> > We can have multiple types of transaction locks(or generic lock)
> depending
> > on the operations types. This will reduce the number of locks to
> > create/update if there are multiple target resources to operate on for a
> > single transaction.
> >
> > - Single ledger deletion: /transactions/broker-id/delete_ledger/ledger_id
> > - Mult-ledger deletion: /transactions/broker-id/delete_ledgers/ledgers :
> > {ledger_ids[a,b,c,d], last_deleted_ledger_index:3}
> > //last_deleted_ledger_index could be periodically updated every min. This
> > can help to resume the deletion when retrying.
> > - Topic deletion : /transactions/broker-id/delete_topic/topic_name
> >
> >
> >
> > > > - How does Pulsar deduplicate deletion requests(error out to users)
> > while
> > > > the deletion request is running?
> > > The user only can invoke `truncateTopic`, it's not for a particular
> > > ledger. The note: "The truncate operation will move all cursors to the
> > end
> > > of the topic and delete all inactive ledgers."
> > > It's just a trigger for the user.
> > >
> >
> > What if the admin concurrently requests `truncateTopic` many times for
> the
> > same topic while one truncation job is running? How does Pulsar currently
> > deduplicate these requests? And how does this proposal handle this
> > situation?
> >
> >
> > >
> > > > - How do users track async deletion flow status? (do we expose any
> > > > describeDeletion API to show the deletion status?)
> > > Why need to track the async deletion flow status? The ledger deletion
> is
> > > transparent for pulsarClient. In the broker, deleting a ledger will
> print
> > > the log `delete ledger xx successfully `.
> > > If delete failed, it print the log `delete ledger xxx failed.`
> > >
> >
> > IMHO, relying on logs to check the system state is not a good practice.
> > Generally, every async user/admin API(long-running async workflow API)
> > needs the corresponding describe* API to return the current running
> state.
> >
> >
> > Regards,
> > Heesung
> >
>

Reply via email to