Hi,

The vote is open in this email thread.

https://lists.apache.org/thread/6y8407pw4fv21n2n0cbrvsspg5tok0h7

Regards,
Heesung

On Fri, Nov 4, 2022 at 3:07 PM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Hi,
>
> > I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
> The problem that this PIP is trying to solve is how to support custom
> topic compaction logic. (currently, Pulsar only takes the latest message in
> the topic compaction and table-view(consumers))
>
> As Michael and I discussed in the above emails, in the worst case, when
> there are many conflicting messages, this PIP can incur more repeated
> custom compaction than the alternative as individual consumers need to
> compact messages (topic compaction and table views). However, one of the
> advantages of this proposal is that pub/sub is faster since it uses a
> single topic. For example, in PIP-192, if the "bundle assignment" broadcast
> is fast enough, conflicting bundle assignment requests can be
> reduced(broadcast filter effect).
>
>
> Post-Compaction(this PIP proposes)
>
>    - - Producers publish messages to a single topic.
>    - - All consumers individually run the custom compaction logic when
>    consuming the topic (by table-view).
>    - - Compactor needs to run the custom compaction logic during
>    compaction.
>
>
>
> The alternative that Michael proposed is instead compacting messages at
> the earlier stage by a single writer, using two topics.
> Pre-Compaction(the alternative that Michael proposes)
>
>    - - Producers publish messages to a non-compacted topic first.
>    - - Only the leader consumes this non-compacted topic and runs the
>    custom compaction logic.
>    - - Then, the leader publishes compacted messages to the compacted
>    topic(resolve conflicts by the single writer).
>    - - All consumers consume the compacted topic. (no need to compact the
>    messages separately on the consumer side)
>    - - Compactor does not need to run the custom compaction logic during
>    compaction.
>
>
>
> > It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
> The pip does not change the default behavior of compaction and table-view.
> I updated the goals in the PIP to clarify this.
>
> Thanks,
> Heesung
>
>
> On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <w...@apache.org> wrote:
>
>>
>>
>> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn 
>> > <heesung.s...@streamnative.io.INVALID>
>> wrote:
>> >
>> > Hi,
>> >
>> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
>> PIP.
>> >
>> > Hopefully, we provided enough context about this PIP and the design
>> > trade-off as well.
>>
>> I would like to understand what the performance tradeoffs are for the
>> changes that you (as an individual) and others are proposing.
>>
>> > Goal
>> >
>> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
>> where we can configure a compaction strategy,
>> > TopicCompactionStrategy
>> >
>> >       • Update the TableViewConfigurationData to load and consider the
>> TopicCompactionStrategy when updating the internal key-value map in
>> TableView.
>> >
>> >       • Add TopicCompactionStrategy in Topic-level Policy to run
>> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
>> compaction.
>>
>> It really seems that you are proposing a change to the default behavior
>> whether or not a user chooses to use the interface in PIP-192.
>>
>> >
>> > I will send out a vote email soon.
>> >
>> > Thank you,
>> > Heesung
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mmarsh...@apache.org>
>> > wrote:
>> >
>> >> Thank you for your detailed responses, Heesung.
>> >>
>> >>> We are not planning to expose this feature to users
>> >>> soon unless demanded and proven to be stable.
>> >>
>> >> In that case, I think we should move forward with this PIP. I have a
>> >> different opinion about the trade offs for the two designs, but none
>> >> of my concerns are problems that could not be solved later if we
>> >> encounter problems.
>> >>
>> >> Just to say it succinctly, my concern is that broadcasting all
>> >> attempts to acquire ownership of every unclaimed bundle to all brokers
>> >> will generate a lot of unnecessary traffic.
>> >>
>> >>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> >>
>> >> Thank you for this reference. I missed it. That is great documentation!
>> >>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>
>> >> I was thinking that the result of `compact` would be the result put in
>> >> the table view or written to the compacted topic. The one issue might
>> >> be about keeping the memory utilization down for use cases that are
>> >> not updating the message's value but are only selecting "left" or
>> >> "right". I thought we could infer when to keep the message id vs keep
>> >> the message value, but that might be easy to implement.
>> >>
>> >> My final critique is that I think `isValid` could have a better name.
>> >> In the event this does become a public API, I don't think all use
>> >> cases will think about which event should be persisted in terms of
>> >> validity.
>> >>
>> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> >> comes from Akka Streams. It's not perfect, but it is easy to infer
>> >> what the result will do.
>> >>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized.
>> >>
>> >> Great, I should have realized that. That takes care of that concern.
>> >>
>> >> Thanks,
>> >> Michael
>> >>
>> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> >> <heesung.s...@streamnative.io.invalid> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I have a different thought about my previous comment.
>> >>>
>> >>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>> >>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>
>> >>> boolean isValid(T prev, T cur)
>> >>> boolean isMergeEnabled()
>> >>> T merge(T prev, T cur)
>> >>>
>> >>> =>
>> >>>
>> >>> T compact(T prev, T cur)
>> >>>
>> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear
>> if
>> >>> prev => cur is a valid transition or not(if invalid, we should filter
>> out
>> >>> the cur message instead of further compacting/merging). I think we
>> still
>> >>> need to keep the `isValid()` and `merge()` separated.
>> >>>
>> >>> Regarding redundant deserialization, the input type `T` is the type of
>> >>> message value, so the input values are already deserialized. We don't
>> >> want
>> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
>> >>> message serialization/deserialization dependencies in the
>> >>> CompactionStrategy.
>> >>>
>> >>> The `merge()` functionality is suggested for more complex use cases
>> >> (merge
>> >>> values instead of just filtering), and to support this `merge()`, we
>> need
>> >>> to internally create a new msg with the compacted value, metadata, and
>> >>> messageId copies. We could initially define `isValid()` only in
>> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in
>> the
>> >>> CompactionStrategy interface if requested.
>> >>>
>> >>> Regards,
>> >>> Heesung
>> >>>
>> >>>
>> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> >> heesung.s...@streamnative.io>
>> >>> wrote:
>> >>>
>> >>>> Oops! Michael, I apologize for the typo in your name.
>> >>>>
>> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> >> heesung.s...@streamnative.io>
>> >>>> wrote:
>> >>>>
>> >>>>> Hi Machel,
>> >>>>>
>> >>>>> Here are my additional comments regarding your earlier email.
>> >>>>>
>> >>>>> - I updated the PIP title to show that this will impact table view
>> as
>> >>>>> well.
>> >>>>>
>> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> >>>>> general idea of the states and their actions, and I defined the
>> actual
>> >>>>> states here in the PR,
>> >>>>>
>> >>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> .
>> >> I
>> >>>>> will further clarify the bundle state data validation logic when
>> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> >> support
>> >>>>> CompactionStrategy in general.
>> >>>>>
>> >>>>> - Agreed with your point that we should merge CompactionStrategy
>> >> APIs. I
>> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>> >>>>>
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Heesung
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> >>>>> heesung.s...@streamnative.io> wrote:
>> >>>>>
>> >>>>>> Hi,
>> >>>>>> Thank you for the great comments.
>> >>>>>> Please find my comments inline too.
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Heesung
>> >>>>>>
>> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> >> mmarsh...@apache.org>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>>> I think we lose a single linearized view.
>> >>>>>>>
>> >>>>>>> Which linearized view are we losing, and what is the role of that
>> >>>>>>> linearized view? I think I might be missing why it is important. I
>> >>>>>>> agree that consumers won't know about each unsuccessful attempted
>> >>>>>>> acquisition of a bundle, but that seems like unnecessary
>> information
>> >>>>>>> to broadcast to every broker in the cluster.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>> PIP-192 proposes an assignment, transfer, and split
>> >> protocol(multi-phase
>> >>>>>> state changes), relying on early broadcast across brokers, and all
>> >> brokers
>> >>>>>> react to their clients according to the state change notifications
>> --
>> >>>>>> brokers could defer any client lookups for bundle x if an
>> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> >> topic).
>> >>>>>> One early broadcast example is the one that I discussed above,
>> `When
>> >> the
>> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> >> I think
>> >>>>>> the prefilter could delay this early broadcast, as it needs to go
>> >> through
>> >>>>>> the additional single-leader compaction path.
>> >>>>>>
>> >>>>>> The bundle state recovery process is simpler by a single linearized
>> >> view.
>> >>>>>>
>> >>>>>> The single linearized view can be easier to debug bundle states. We
>> >> can
>> >>>>>> more easily track where the assignment requests come from and how
>> it
>> >> is
>> >>>>>> compacted in a single linearized view.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> I think the leader requires a write-through cache to compact
>> >> messages
>> >>>>>>> based
>> >>>>>>>> on the latest states.
>> >>>>>>>
>> >>>>>> This brings up an important point that I would like to clarify. If
>> we
>> >>>>>>> trust the write ahead log as the source of truth, what happens
>> when
>> >> a
>> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> >> starts
>> >>>>>>> and consumes from the compacted topic, how do we prevent it from
>> >>>>>>> incorrectly thinking that it owns a bundle for some short time
>> >> period
>> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
>> >>>>>>> remove old ownership state?
>> >>>>>>>
>> >>>>>>
>> >>>>>> Since the multi-phase transfer protocol involves the source and
>> >>>>>> destination broker's actions, the successful transfer should get
>> the
>> >> source
>> >>>>>> and destination broker to have the (near) latest state. For
>> example,
>> >> if
>> >>>>>> some brokers have old ownership states(network partitioned or
>> >> delayed),
>> >>>>>> they will redirect clients to the source(old) broker. However, by
>> the
>> >>>>>> transfer protocol, the source broker should have the latest state,
>> >> so it
>> >>>>>> can redirect the client again to the destination broker.
>> >>>>>>
>> >>>>>> When a broker restarts, it won't start until its BSC state to the
>> >> (near)
>> >>>>>> latest (til the last known messageId at that time).
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Pulsar guarantees "a single writer".
>> >>>>>>>
>> >>>>>>> I didn't think we were using a single writer in the PIP 192
>> design.
>> >> I
>> >>>>>>> thought we had many producers sending events to a compacted topic.
>> >> My
>> >>>>>>> proposal would still have many producers, but the writer to
>> >> bookkeeper
>> >>>>>>> would act as the single writer. It would technically be distinct
>> >> from
>> >>>>>>> a normal Pulsar topic producer.
>> >>>>>>>
>> >>>>>>> I should highlight that I am only proposing "broker filtering
>> before
>> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
>> >>>>>>> pluggable compaction strategies. It would not be a generic
>> feature.
>> >>>>>>>
>> >>>>>>>
>> >>>>>> I was worried about the worst case where two producers(leaders)
>> >> happen
>> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
>> single
>> >>>>>> writer" or "a single producer" for a topic in normal situations).
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> Could we clarify how to handle
>> >>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>
>> >>>>>>> It is a persistent topic.
>> >>>>>>>
>> >>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>
>> >>>>>>> A leader would recover state by first consuming the whole
>> compacted
>> >>>>>>> topic and then by consuming from the current location of a cursor
>> on
>> >>>>>>> the first input topic. As stated elsewhere, this introduces
>> latency
>> >>>>>>> and could be an issue.
>> >>>>>>>
>> >>>>>>>> 2. How do we handle the case when the leader fails before writing
>> >>>>>>> messages
>> >>>>>>>> to the compacted topic
>> >>>>>>>
>> >>>>>>> The leader would not acknowledge the message on the input topic
>> >> until
>> >>>>>>> it has successfully persisted the event on the compacted topic.
>> >>>>>>> Publishing the same event to a compacted topic multiple times is
>> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
>> >>>>>>> latency. However, I think we might have similar (though not the
>> >> same)
>> >>>>>>> latency risks in the current solution.
>> >>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>
>> >>>>>>> My primary point is that with this PIP's design, the filter logic
>> is
>> >>>>>>> run on every broker and again during topic compaction. With the
>> >>>>>>> alternative design, the filter is run once.
>> >>>>>>>
>> >>>>>>> Thank you for the clarification.
>> >>>>>>
>> >>>>>> I think the difference is that the post-filter is an optimistic
>> >> approach
>> >>>>>> as it optimistically relies on the "broadcast-filter"
>> effect(brokers
>> >> will
>> >>>>>> defer client lookups if notified ahead that any assignment is
>> >> ongoing for
>> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> >> broker
>> >>>>>> needs to individually compact the conflicting assignment requests.
>> >>>>>>
>> >>>>>> Conversely, one downside of the pessimistic approach (single leader
>> >>>>>> pre-filter) is that when there are not many conflict concurrent
>> >> assignment
>> >>>>>> requests(assign for bundle a, assign for bundle b, assign for
>> bundle
>> >> c...),
>> >>>>>> the requests need to redundantly go through the leader compaction.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>
>> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
>> >>>>>>> on both the client (table view) and the server (compaction) is non
>> >>>>>>> trivial. The proposed API includes hard coded client configuration
>> >> for
>> >>>>>>> each component, which will make upgrading the version of the
>> >>>>>>> compaction strategy complicated, and could lead to incorrect
>> >>>>>>> interpretation of events in the stream. When a single broker is
>> >> doing
>> >>>>>>> the filtering, versioning is no longer a distributed problem. That
>> >>>>>>> being said, I do not mean to suggest my solution is without
>> >>>>>>> complexity.
>> >>>>>>>
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>
>> >>>>>>> It's fair to say that we could add it later, but at that point, we
>> >>>>>>> will have added this new API for compaction strategy. Are we
>> >> confident
>> >>>>>>> that pluggable compaction is independently an important addition
>> to
>> >>>>>>> Pulsar's
>> >>>>>>> features, or would it make sense to make this API only exposed in
>> >> the
>> >>>>>>> broker?
>> >>>>>>>
>> >>>>>>>
>> >>>>>> The intention is that this compaction feature could be useful for
>> >>>>>> complex user applications (if they are trying to do a similar
>> >> thing). As I
>> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> >> not
>> >>>>>> planning to expose this feature to users soon unless demanded and
>> >> proven to
>> >>>>>> be stable.
>> >>>>>>
>> >>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Michael
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> >>>>>>> <heesung.s...@streamnative.io.invalid> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> Also, I thought about some concurrent assignment scenarios
>> between
>> >>>>>>>> pre-filter vs post-filter.
>> >>>>>>>>
>> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>> >>>>>>> {broker B
>> >>>>>>>> assigned bundle x to broker C}
>> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>> >>>>>>> {broker C
>> >>>>>>>> assigned bundle x to broker B}
>> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1,m2,
>> >>>>>>>> and m3
>> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>> >>>>>>> broadcasted
>> >>>>>>>> m1 to all consumers
>> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> >> assigned
>> >>>>>>>> bundle x to broker C}
>> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> >> assigned
>> >>>>>>>> bundle x to broker B}
>> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages:
>> m1,m2,
>> >>>>>>> and m3
>> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> >> messages
>> >>>>>>> to m1.
>> >>>>>>>>
>> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> >> number
>> >>>>>>> of
>> >>>>>>>> messages to broadcast at the expense of the leader broker
>> >> compaction.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>> >>>>>>>> assignment requests
>> >>>>>>>>
>> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> >> the
>> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> >> messages:
>> >>>>>>> m1
>> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
>> >>>>>>> broadcasted m1
>> >>>>>>>> to all consumers
>> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>> >>>>>>> accepted,
>> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> >> requests)
>> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> With post-filter + a single topic
>> >>>>>>>> t1: A -> topic // broker A published a message to the
>> >> non-compacted
>> >>>>>>> topic,
>> >>>>>>>> m1: {broker A assigned bundle x to broker A}
>> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has
>> been
>> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> >> lookup
>> >>>>>>>> requests)
>> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>> >>>>>>> assignment is
>> >>>>>>>> running(meanwhile deferring lookup requests)
>> >>>>>>>>
>> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> >> this
>> >>>>>>> case
>> >>>>>>>> without the additional leader coordination and the secondary
>> topic
>> >>>>>>> because
>> >>>>>>>> the early broadcast can inform all brokers and prevent them from
>> >>>>>>> requesting
>> >>>>>>>> other assignments for the same bundle.
>> >>>>>>>>
>> >>>>>>>> I think the post-filter option is initially not bad because:
>> >>>>>>>>
>> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
>> >>>>>>> correctly
>> >>>>>>>> pre-filtered at the leader)
>> >>>>>>>> 2. it performs ok because the early broadcast can prevent
>> >>>>>>>> concurrent assignment requests.
>> >>>>>>>> 3. initially less complex to implement (leaderless conflict
>> >>>>>>> resolution and
>> >>>>>>>> requires a single topic)
>> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> >> pre-filter
>> >>>>>>> logic
>> >>>>>>>> as well later)
>> >>>>>>>>
>> >>>>>>>> Regards,
>> >>>>>>>> Heesung
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> >>>>>>> heesung.s...@streamnative.io>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Michael,
>> >>>>>>>>>
>> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
>> >>>>>>>>> I think the leader requires a write-through cache to compact
>> >>>>>>> messages
>> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> >> for
>> >>>>>>> the
>> >>>>>>>>> last msg from the (compacted) topic before compacting the next
>> >> msg
>> >>>>>>> for the
>> >>>>>>>>> same bundle.
>> >>>>>>>>>
>> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>> >>>>>>> election,
>> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>> >>>>>>> post-filter
>> >>>>>>>>> on the consumer side(compaction and table views) as well in
>> >> order to
>> >>>>>>>>> validate the state changes.
>> >>>>>>>>>
>> >>>>>>>>> For the two-topic approach,
>> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> >> to
>> >>>>>>> handle
>> >>>>>>>>> the following(edge cases and failure recovery)?
>> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> >> non-persistent
>> >>>>>>> topic?
>> >>>>>>>>> 1. How does the leader recover state from the two topics?
>> >>>>>>>>> 2. How do we handle the case when the leader fails before
>> >> writing
>> >>>>>>> messages
>> >>>>>>>>> to the compacted topic
>> >>>>>>>>>
>> >>>>>>>>> Regards,
>> >>>>>>>>> Heesung
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> >>>>>>> mmarsh...@apache.org>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> >> topics
>> >>>>>>>>>> instead of one. In this design, the first topic is the
>> >> unfiltered
>> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> >> to
>> >>>>>>>>>> acquire ownership of bundles. The second topic is the
>> >> distilled log
>> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>> >>>>>>> There is
>> >>>>>>>>>> a single writer, the leader broker, that reads from the input
>> >> topic
>> >>>>>>>>>> and writes to the output topic. The first topic is normal and
>> >> the
>> >>>>>>>>>> second is compacted.
>> >>>>>>>>>>
>> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> >> for
>> >>>>>>> the
>> >>>>>>>>>> leader broker to trade off ownership without needing to slow
>> >> down
>> >>>>>>>>>> writes to the input topic. The leader broker will start
>> >> consuming
>> >>>>>>> from
>> >>>>>>>>>> the input topic when it has fully consumed the table view on
>> >> the
>> >>>>>>>>>> output topic. In general, I don't think consumers know when
>> >> they
>> >>>>>>> have
>> >>>>>>>>>> "reached the end of a table view", but we should be able to
>> >>>>>>> trivially
>> >>>>>>>>>> figure this out if we are the topic's only writer and the
>> >> topic and
>> >>>>>>>>>> writer are collocated on the same broker.
>> >>>>>>>>>>
>> >>>>>>>>>> In that design, it might make sense to use something like the
>> >>>>>>>>>> replication cursor to keep track of this consumer's state.
>> >>>>>>>>>>
>> >>>>>>>>>> - Michael
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> >>>>>>> mmarsh...@apache.org>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks for your proposal, Heesung.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> >> because
>> >>>>>>> we have
>> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> >> this
>> >>>>>>> problem
>> >>>>>>>>>>> by changing our write pattern? What if we use the leader
>> >> broker
>> >>>>>>> as the
>> >>>>>>>>>>> single writer? That broker would intercept attempts to
>> >> acquire
>> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
>> >>>>>>> broker to
>> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>> >>>>>>> letting the
>> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
>> >>>>>>> ownership
>> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> >> persist
>> >>>>>>> that
>> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> >> true
>> >>>>>>>>>>> ownership log, which will correctly work with the existing
>> >> topic
>> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
>> >>>>>>> moves the
>> >>>>>>>>>>> conflict resolution to just before the write, and as a
>> >>>>>>> consequence, it
>> >>>>>>>>>>> greatly reduces the need for post processing of the event
>> >> log.
>> >>>>>>> One
>> >>>>>>>>>>> trade off might be that the leader broker could slow down the
>> >>>>>>> write
>> >>>>>>>>>>> path, but given that the leader would just need to verify the
>> >>>>>>> current
>> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> >> on
>> >>>>>>> bundle
>> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> >> I am
>> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> >> 192
>> >>>>>>>>>>> paradigm.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> >> we
>> >>>>>>> move
>> >>>>>>>>>>> forward with the design as it is currently proposed.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think it might be helpful to update the title to show that
>> >> this
>> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>> >>>>>>> that at
>> >>>>>>>>>>> first.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Do you have any documentation describing how the
>> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> >> valid in
>> >>>>>>> the
>> >>>>>>>>>>> context of load balancing? I looked at
>> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>> >>>>>>> seem to
>> >>>>>>>>>>> find anything for it. That would help make this proposal less
>> >>>>>>>>>>> abstract.
>> >>>>>>>>>>>
>> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>> >>>>>>> example,
>> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> >> The
>> >>>>>>>>>>> fundamental question for compaction is which value to keep
>> >> (or
>> >>>>>>> build a
>> >>>>>>>>>>> new value). I think we might be able to simplify the API by
>> >>>>>>> replacing
>> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>> >>>>>>> single
>> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> >> That
>> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
>> >>>>>>> times too.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> >> 105
>> >>>>>>> broker
>> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>> >>>>>>> broker as
>> >>>>>>>>>>> much as possible. (I do not consider the load balancing
>> >> logic to
>> >>>>>>> be a
>> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>> >>>>>>> facing
>> >>>>>>>>>>> feature unless there is demand for real use cases.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Michael
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org>
>> >> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> +1(non-binding)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> thanks,
>> >>>>>>>>>>>> bo
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Heesung Sohn <heesung.s...@streamnative.io.invalid>
>> >>>>>>> 于2022年10月19日周三
>> >>>>>>>>>> 07:54写道:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi pulsar-dev community,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>> >>>>>>> Compaction
>> >>>>>>>>>> Strategy
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>> Heesung
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>
>>
>>

Reply via email to