Hi,

Also, I thought about some concurrent assignment scenarios between
pre-filter vs post-filter.

Example 1: When the topic broadcast is slower than the concurrent
assignment requests

With pre-filter + two-topics (non-compacted and compacted topics)
t1: A -> non-compacted topic // broker A published a message to the
non-compacted topic, m1: {broker A assigned bundle x to broker A}
t2: B -> non-compacted topic // broker B published a message, m2: {broker B
assigned bundle x to broker C}
t3: C -> non-compacted topic // broker C published a message, m3: {broker C
assigned bundle x to broker B}
t4: non-compacted topic -> L // leader broker consumed the messages: m1,m2,
and m3
t5: L -> compacted topic // leader compacted the messages and broadcasted
m1 to all consumers
t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1

With post-filter + a single topic
t1: A -> topic // broker A published a message to the non-compacted topic,
m1: {broker A assigned bundle x to broker A}
t2: B -> topic // broker B published a message, m2: {broker B assigned
bundle x to broker C}
t3: C -> topic // broker C published a message, m3: {broker C assigned
bundle x to broker B}
t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and m3
t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages to m1.

Analysis: the "pre-filter + two-topics" option can reduce the number of
messages to broadcast at the expense of the leader broker compaction.


Example 2: When the topic broadcast is faster than the concurrent
assignment requests

With pre-filter + two-topics (non-compacted and compacted topics)
t1: A -> non-compacted topic // broker A published a message to the
non-compacted topic, m1: {broker A assigned bundle x to broker A}
t2: non-compacted topic -> L // leader broker consumed the messages: m1
t3: L -> compacted topic // leader compacted the message and broadcasted m1
to all consumers
t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
t5: A-> own bundle // broker A knows that its assignment has been accepted,
so proceeding to own the bundle (meanwhile deferring lookup requests)
t6: B -> defer client lookups // broker B knows that bundle assignment is
running(meanwhile deferring lookup requests)
t7: C -> defer client lookups // broker C knows that bundle assignment is
running(meanwhile deferring lookup requests)

With post-filter + a single topic
t1: A -> topic // broker A published a message to the non-compacted topic,
m1: {broker A assigned bundle x to broker A}
t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
t3:  A-> own bundle // broker A knows that its assignment has been
accepted, so proceeding to own the bundle (meanwhile deferring lookup
requests)
t4: B -> defer client lookups // broker B knows that bundle assignment is
running(meanwhile deferring lookup requests)
t5: C -> defer client lookups // broker C knows that bundle assignment is
running(meanwhile deferring lookup requests)

Analysis: The "post-filter + a single topic" can perform ok in this case
without the additional leader coordination and the secondary topic because
the early broadcast can inform all brokers and prevent them from requesting
other assignments for the same bundle.

I think the post-filter option is initially not bad because:

1. it is safe in the worst case (in case the messages are not correctly
pre-filtered at the leader)
2. it performs ok because the early broadcast can prevent
concurrent assignment requests.
3. initially less complex to implement (leaderless conflict resolution and
requires a single topic)
4. it is not a "one-way door" decision (we could add the pre-filter logic
as well later)

Regards,
Heesung



On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Hi Michael,
>
> For the pre-prefilter(pre-compaction) option,
> I think the leader requires a write-through cache to compact messages
> based on the latest states. Otherwise, the leader needs to wait for the
> last msg from the (compacted) topic before compacting the next msg for the
> same bundle.
>
> Pulsar guarantees "a single writer". However, for the worst-case
> scenario(due to network partitions, bugs in zk or etcd leader election,
> bugs in bk, data corruption ), I think it is safe to place the post-filter
> on the consumer side(compaction and table views) as well in order to
> validate the state changes.
>
> For the two-topic approach,
> I think we lose a single linearized view. Could we clarify how to handle
> the following(edge cases and failure recovery)?
> 0. Is the un-compacted topic a persistent topic or a non-persistent topic?
> 1. How does the leader recover state from the two topics?
> 2. How do we handle the case when the leader fails before writing messages
> to the compacted topic
>
> Regards,
> Heesung
>
> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
>> Sharing some more thoughts. We could alternatively use two topics
>> instead of one. In this design, the first topic is the unfiltered
>> write ahead log that represents many writers (brokers) trying to
>> acquire ownership of bundles. The second topic is the distilled log
>> that represents the "winners" or the "owners" of the bundles. There is
>> a single writer, the leader broker, that reads from the input topic
>> and writes to the output topic. The first topic is normal and the
>> second is compacted.
>>
>> The primary benefit in a two topic solution is that it is easy for the
>> leader broker to trade off ownership without needing to slow down
>> writes to the input topic. The leader broker will start consuming from
>> the input topic when it has fully consumed the table view on the
>> output topic. In general, I don't think consumers know when they have
>> "reached the end of a table view", but we should be able to trivially
>> figure this out if we are the topic's only writer and the topic and
>> writer are collocated on the same broker.
>>
>> In that design, it might make sense to use something like the
>> replication cursor to keep track of this consumer's state.
>>
>> - Michael
>>
>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <mmarsh...@apache.org>
>> wrote:
>> >
>> > Thanks for your proposal, Heesung.
>> >
>> > Fundamentally, we have the problems listed in this PIP because we have
>> > multiple writers instead of just one writer. Can we solve this problem
>> > by changing our write pattern? What if we use the leader broker as the
>> > single writer? That broker would intercept attempts to acquire
>> > ownership on bundles and would grant ownership to the first broker to
>> > claim an unassigned bundle. It could "grant ownership" by letting the
>> > first write to claim an unassigned bundle get written to the ownership
>> > topic. When a bundle is already owned, the leader won't persist that
>> > event to the bookkeeper. In this design, the log becomes a true
>> > ownership log, which will correctly work with the existing topic
>> > compaction and table view solutions. My proposal essentially moves the
>> > conflict resolution to just before the write, and as a consequence, it
>> > greatly reduces the need for post processing of the event log. One
>> > trade off might be that the leader broker could slow down the write
>> > path, but given that the leader would just need to verify the current
>> > state of the bundle, I think it'd be performant enough.
>> >
>> > Additionally, we'd need the leader broker to be "caught up" on bundle
>> > ownership in order to grant ownership of topics, but unless I am
>> > mistaken, that is already a requirement of the current PIP 192
>> > paradigm.
>> >
>> > Below are some additional thoughts that will be relevant if we move
>> > forward with the design as it is currently proposed.
>> >
>> > I think it might be helpful to update the title to show that this
>> > proposal will also affect table view as well. I didn't catch that at
>> > first.
>> >
>> > Do you have any documentation describing how the
>> > TopicCompactionStrategy will determine which states are valid in the
>> > context of load balancing? I looked at
>> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem to
>> > find anything for it. That would help make this proposal less
>> > abstract.
>> >
>> > The proposed API seems very tied to the needs of PIP 192. For example,
>> > `isValid` is not a term I associate with topic compaction. The
>> > fundamental question for compaction is which value to keep (or build a
>> > new value). I think we might be able to simplify the API by replacing
>> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>> > method that lets the implementation handle one or all tasks. That
>> > would also remove the need to deserialize payloads multiple times too.
>> >
>> > I also feel like mentioning that after working with the PIP 105 broker
>> > side filtering, I think we should avoid running UDFs in the broker as
>> > much as possible. (I do not consider the load balancing logic to be a
>> > UDF here.) I think it would be worth not making this a user facing
>> > feature unless there is demand for real use cases.
>> >
>> > Thanks!
>> > Michael
>> >
>> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote:
>> > >
>> > > +1(non-binding)
>> > >
>> > > thanks,
>> > > bo
>> > >
>> > > Heesung Sohn <heesung.s...@streamnative.io.invalid> 于2022年10月19日周三
>> 07:54写道:
>> > > >
>> > > > Hi pulsar-dev community,
>> > > >
>> > > > I raised a pip to discuss : PIP-215: Configurable Topic Compaction
>> Strategy
>> > > >
>> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>> > > >
>> > > > Regards,
>> > > > Heesung
>>
>

Reply via email to