> I think we lose a single linearized view.

Which linearized view are we losing, and what is the role of that
linearized view? I think I might be missing why it is important. I
agree that consumers won't know about each unsuccessful attempted
acquisition of a bundle, but that seems like unnecessary information
to broadcast to every broker in the cluster.

> I think the leader requires a write-through cache to compact messages based
> on the latest states.

This brings up an important point that I would like to clarify. If we
trust the write ahead log as the source of truth, what happens when a
bundle has been validly owned by multiple brokers? As a broker starts
and consumes from the compacted topic, how do we prevent it from
incorrectly thinking that it owns a bundle for some short time period
in the case that the ownership topic hasn't yet been compacted to
remove old ownership state?

> Pulsar guarantees "a single writer".

I didn't think we were using a single writer in the PIP 192 design. I
thought we had many producers sending events to a compacted topic. My
proposal would still have many producers, but the writer to bookkeeper
would act as the single writer. It would technically be distinct from
a normal Pulsar topic producer.

I should highlight that I am only proposing "broker filtering before
write" in the context of PIP 192 and as an alternative to adding
pluggable compaction strategies. It would not be a generic feature.

> Could we clarify how to handle
> the following(edge cases and failure recovery)?
> 0. Is the un-compacted topic a persistent topic or a non-persistent topic?

It is a persistent topic.

> 1. How does the leader recover state from the two topics?

A leader would recover state by first consuming the whole compacted
topic and then by consuming from the current location of a cursor on
the first input topic. As stated elsewhere, this introduces latency
and could be an issue.

> 2. How do we handle the case when the leader fails before writing messages
> to the compacted topic

The leader would not acknowledge the message on the input topic until
it has successfully persisted the event on the compacted topic.
Publishing the same event to a compacted topic multiple times is
idempotent, so there is no risk of lost state. The real risk is
latency. However, I think we might have similar (though not the same)
latency risks in the current solution.

> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.

My primary point is that with this PIP's design, the filter logic is
run on every broker and again during topic compaction. With the
alternative design, the filter is run once.

> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)

PIP 215 has its own complexity too. Coordinating filters
on both the client (table view) and the server (compaction) is non
trivial. The proposed API includes hard coded client configuration for
each component, which will make upgrading the version of the
compaction strategy complicated, and could lead to incorrect
interpretation of events in the stream. When a single broker is doing
the filtering, versioning is no longer a distributed problem. That
being said, I do not mean to suggest my solution is without
complexity.

> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)

It's fair to say that we could add it later, but at that point, we
will have added this new API for compaction strategy. Are we confident
that pluggable compaction is independently an important addition to Pulsar's
features, or would it make sense to make this API only exposed in the broker?

Thanks,
Michael





On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
<heesung.s...@streamnative.io.invalid> wrote:
>
> Hi,
>
> Also, I thought about some concurrent assignment scenarios between
> pre-filter vs post-filter.
>
> Example 1: When the topic broadcast is slower than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: B -> non-compacted topic // broker B published a message, m2: {broker B
> assigned bundle x to broker C}
> t3: C -> non-compacted topic // broker C published a message, m3: {broker C
> assigned bundle x to broker B}
> t4: non-compacted topic -> L // leader broker consumed the messages: m1,m2,
> and m3
> t5: L -> compacted topic // leader compacted the messages and broadcasted
> m1 to all consumers
> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: B -> topic // broker B published a message, m2: {broker B assigned
> bundle x to broker C}
> t3: C -> topic // broker C published a message, m3: {broker C assigned
> bundle x to broker B}
> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to m1.
>
> Analysis: the "pre-filter + two-topics" option can reduce the number of
> messages to broadcast at the expense of the leader broker compaction.
>
>
> Example 2: When the topic broadcast is faster than the concurrent
> assignment requests
>
> With pre-filter + two-topics (non-compacted and compacted topics)
> t1: A -> non-compacted topic // broker A published a message to the
> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> t2: non-compacted topic -> L // leader broker consumed the messages: m1
> t3: L -> compacted topic // leader compacted the message and broadcasted m1
> to all consumers
> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> t5: A-> own bundle // broker A knows that its assignment has been accepted,
> so proceeding to own the bundle (meanwhile deferring lookup requests)
> t6: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t7: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> With post-filter + a single topic
> t1: A -> topic // broker A published a message to the non-compacted topic,
> m1: {broker A assigned bundle x to broker A}
> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> t3:  A-> own bundle // broker A knows that its assignment has been
> accepted, so proceeding to own the bundle (meanwhile deferring lookup
> requests)
> t4: B -> defer client lookups // broker B knows that bundle assignment is
> running(meanwhile deferring lookup requests)
> t5: C -> defer client lookups // broker C knows that bundle assignment is
> running(meanwhile deferring lookup requests)
>
> Analysis: The "post-filter + a single topic" can perform ok in this case
> without the additional leader coordination and the secondary topic because
> the early broadcast can inform all brokers and prevent them from requesting
> other assignments for the same bundle.
>
> I think the post-filter option is initially not bad because:
>
> 1. it is safe in the worst case (in case the messages are not correctly
> pre-filtered at the leader)
> 2. it performs ok because the early broadcast can prevent
> concurrent assignment requests.
> 3. initially less complex to implement (leaderless conflict resolution and
> requires a single topic)
> 4. it is not a "one-way door" decision (we could add the pre-filter logic
> as well later)
>
> Regards,
> Heesung
>
>
>
> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <heesung.s...@streamnative.io>
> wrote:
>
> > Hi Michael,
> >
> > For the pre-prefilter(pre-compaction) option,
> > I think the leader requires a write-through cache to compact messages
> > based on the latest states. Otherwise, the leader needs to wait for the
> > last msg from the (compacted) topic before compacting the next msg for the
> > same bundle.
> >
> > Pulsar guarantees "a single writer". However, for the worst-case
> > scenario(due to network partitions, bugs in zk or etcd leader election,
> > bugs in bk, data corruption ), I think it is safe to place the post-filter
> > on the consumer side(compaction and table views) as well in order to
> > validate the state changes.
> >
> > For the two-topic approach,
> > I think we lose a single linearized view. Could we clarify how to handle
> > the following(edge cases and failure recovery)?
> > 0. Is the un-compacted topic a persistent topic or a non-persistent topic?
> > 1. How does the leader recover state from the two topics?
> > 2. How do we handle the case when the leader fails before writing messages
> > to the compacted topic
> >
> > Regards,
> > Heesung
> >
> > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mmarsh...@apache.org>
> > wrote:
> >
> >> Sharing some more thoughts. We could alternatively use two topics
> >> instead of one. In this design, the first topic is the unfiltered
> >> write ahead log that represents many writers (brokers) trying to
> >> acquire ownership of bundles. The second topic is the distilled log
> >> that represents the "winners" or the "owners" of the bundles. There is
> >> a single writer, the leader broker, that reads from the input topic
> >> and writes to the output topic. The first topic is normal and the
> >> second is compacted.
> >>
> >> The primary benefit in a two topic solution is that it is easy for the
> >> leader broker to trade off ownership without needing to slow down
> >> writes to the input topic. The leader broker will start consuming from
> >> the input topic when it has fully consumed the table view on the
> >> output topic. In general, I don't think consumers know when they have
> >> "reached the end of a table view", but we should be able to trivially
> >> figure this out if we are the topic's only writer and the topic and
> >> writer are collocated on the same broker.
> >>
> >> In that design, it might make sense to use something like the
> >> replication cursor to keep track of this consumer's state.
> >>
> >> - Michael
> >>
> >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mmarsh...@apache.org>
> >> wrote:
> >> >
> >> > Thanks for your proposal, Heesung.
> >> >
> >> > Fundamentally, we have the problems listed in this PIP because we have
> >> > multiple writers instead of just one writer. Can we solve this problem
> >> > by changing our write pattern? What if we use the leader broker as the
> >> > single writer? That broker would intercept attempts to acquire
> >> > ownership on bundles and would grant ownership to the first broker to
> >> > claim an unassigned bundle. It could "grant ownership" by letting the
> >> > first write to claim an unassigned bundle get written to the ownership
> >> > topic. When a bundle is already owned, the leader won't persist that
> >> > event to the bookkeeper. In this design, the log becomes a true
> >> > ownership log, which will correctly work with the existing topic
> >> > compaction and table view solutions. My proposal essentially moves the
> >> > conflict resolution to just before the write, and as a consequence, it
> >> > greatly reduces the need for post processing of the event log. One
> >> > trade off might be that the leader broker could slow down the write
> >> > path, but given that the leader would just need to verify the current
> >> > state of the bundle, I think it'd be performant enough.
> >> >
> >> > Additionally, we'd need the leader broker to be "caught up" on bundle
> >> > ownership in order to grant ownership of topics, but unless I am
> >> > mistaken, that is already a requirement of the current PIP 192
> >> > paradigm.
> >> >
> >> > Below are some additional thoughts that will be relevant if we move
> >> > forward with the design as it is currently proposed.
> >> >
> >> > I think it might be helpful to update the title to show that this
> >> > proposal will also affect table view as well. I didn't catch that at
> >> > first.
> >> >
> >> > Do you have any documentation describing how the
> >> > TopicCompactionStrategy will determine which states are valid in the
> >> > context of load balancing? I looked at
> >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
> >> > find anything for it. That would help make this proposal less
> >> > abstract.
> >> >
> >> > The proposed API seems very tied to the needs of PIP 192. For example,
> >> > `isValid` is not a term I associate with topic compaction. The
> >> > fundamental question for compaction is which value to keep (or build a
> >> > new value). I think we might be able to simplify the API by replacing
> >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
> >> > method that lets the implementation handle one or all tasks. That
> >> > would also remove the need to deserialize payloads multiple times too.
> >> >
> >> > I also feel like mentioning that after working with the PIP 105 broker
> >> > side filtering, I think we should avoid running UDFs in the broker as
> >> > much as possible. (I do not consider the load balancing logic to be a
> >> > UDF here.) I think it would be worth not making this a user facing
> >> > feature unless there is demand for real use cases.
> >> >
> >> > Thanks!
> >> > Michael
> >> >
> >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote:
> >> > >
> >> > > +1(non-binding)
> >> > >
> >> > > thanks,
> >> > > bo
> >> > >
> >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid> 于2022年10月19日周三
> >> 07:54写道:
> >> > > >
> >> > > > Hi pulsar-dev community,
> >> > > >
> >> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
> >> Strategy
> >> > > >
> >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
> >> > > >
> >> > > > Regards,
> >> > > > Heesung
> >>
> >

Reply via email to