Hi,

I have a different thought about my previous comment.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.

boolean isValid(T prev, T cur)
boolean isMergeEnabled()
T merge(T prev, T cur)

=>

T compact(T prev, T cur)

In fact, with the `compact(T prev, T cur)` api only, it is not clear if
prev => cur is a valid transition or not(if invalid, we should filter out
the cur message instead of further compacting/merging). I think we still
need to keep the `isValid()` and `merge()` separated.

Regarding redundant deserialization, the input type `T` is the type of
message value, so the input values are already deserialized. We don't want
to expose the Message<T> interface in this CompactionStrategy to avoid
message serialization/deserialization dependencies in the
CompactionStrategy.

The `merge()` functionality is suggested for more complex use cases (merge
values instead of just filtering), and to support this `merge()`, we need
to internally create a new msg with the compacted value, metadata, and
messageId copies. We could initially define `isValid()` only in
CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
CompactionStrategy interface if requested.

Regards,
Heesung


On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Oops! Michael, I apologize for the typo in your name.
>
> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <heesung.s...@streamnative.io>
> wrote:
>
>> Hi Machel,
>>
>> Here are my additional comments regarding your earlier email.
>>
>> - I updated the PIP title to show that this will impact table view as
>> well.
>>
>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>> general idea of the states and their actions, and I defined the actual
>> states here in the PR,
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>>  I
>> will further clarify the bundle state data validation logic when
>> introducing `BundleStateCompactionStrategy` class. This PIP is to support
>> CompactionStrategy in general.
>>
>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>> updated the interface proposal in the PIP. I replaced `"isValid",
>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>
>>
>> Thanks,
>> Heesung
>>
>>
>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>> heesung.s...@streamnative.io> wrote:
>>
>>> Hi,
>>> Thank you for the great comments.
>>> Please find my comments inline too.
>>>
>>> Regards,
>>> Heesung
>>>
>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org>
>>> wrote:
>>>
>>>> > I think we lose a single linearized view.
>>>>
>>>> Which linearized view are we losing, and what is the role of that
>>>> linearized view? I think I might be missing why it is important. I
>>>> agree that consumers won't know about each unsuccessful attempted
>>>> acquisition of a bundle, but that seems like unnecessary information
>>>> to broadcast to every broker in the cluster.
>>>>
>>>>
>>>
>>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>>> state changes), relying on early broadcast across brokers, and all brokers
>>> react to their clients according to the state change notifications --
>>> brokers could defer any client lookups for bundle x if an
>>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>>> One early broadcast example is the one that I discussed above, `When the
>>> topic broadcast is faster than the concurrent assignment requests.` I think
>>> the prefilter could delay this early broadcast, as it needs to go through
>>> the additional single-leader compaction path.
>>>
>>> The bundle state recovery process is simpler by a single linearized view.
>>>
>>> The single linearized view can be easier to debug bundle states. We can
>>> more easily track where the assignment requests come from and how it is
>>> compacted in a single linearized view.
>>>
>>>
>>>
>>>> > I think the leader requires a write-through cache to compact messages
>>>> based
>>>> > on the latest states.
>>>>
>>> This brings up an important point that I would like to clarify. If we
>>>> trust the write ahead log as the source of truth, what happens when a
>>>> bundle has been validly owned by multiple brokers? As a broker starts
>>>> and consumes from the compacted topic, how do we prevent it from
>>>> incorrectly thinking that it owns a bundle for some short time period
>>>> in the case that the ownership topic hasn't yet been compacted to
>>>> remove old ownership state?
>>>>
>>>
>>> Since the multi-phase transfer protocol involves the source and
>>> destination broker's actions, the successful transfer should get the source
>>> and destination broker to have the (near) latest state. For example, if
>>> some brokers have old ownership states(network partitioned or delayed),
>>> they will redirect clients to the source(old) broker. However, by the
>>> transfer protocol, the source broker should have the latest state, so it
>>> can redirect the client again to the destination broker.
>>>
>>> When a broker restarts, it won't start until its BSC state to the (near)
>>> latest (til the last known messageId at that time).
>>>
>>>
>>>> > Pulsar guarantees "a single writer".
>>>>
>>>> I didn't think we were using a single writer in the PIP 192 design. I
>>>> thought we had many producers sending events to a compacted topic. My
>>>> proposal would still have many producers, but the writer to bookkeeper
>>>> would act as the single writer. It would technically be distinct from
>>>> a normal Pulsar topic producer.
>>>>
>>>> I should highlight that I am only proposing "broker filtering before
>>>> write" in the context of PIP 192 and as an alternative to adding
>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>
>>>>
>>> I was worried about the worst case where two producers(leaders) happen
>>> to write the compacted topic (although Pulsar can guarantee "a single
>>> writer" or "a single producer" for a topic in normal situations).
>>>
>>>
>>>
>>>> > Could we clarify how to handle
>>>> > the following(edge cases and failure recovery)?
>>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>>
>>>> It is a persistent topic.
>>>>
>>>> > 1. How does the leader recover state from the two topics?
>>>>
>>>> A leader would recover state by first consuming the whole compacted
>>>> topic and then by consuming from the current location of a cursor on
>>>> the first input topic. As stated elsewhere, this introduces latency
>>>> and could be an issue.
>>>>
>>>> > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > to the compacted topic
>>>>
>>>> The leader would not acknowledge the message on the input topic until
>>>> it has successfully persisted the event on the compacted topic.
>>>> Publishing the same event to a compacted topic multiple times is
>>>> idempotent, so there is no risk of lost state. The real risk is
>>>> latency. However, I think we might have similar (though not the same)
>>>> latency risks in the current solution.
>>>>
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>>
>>>> My primary point is that with this PIP's design, the filter logic is
>>>> run on every broker and again during topic compaction. With the
>>>> alternative design, the filter is run once.
>>>>
>>>> Thank you for the clarification.
>>>
>>> I think the difference is that the post-filter is an optimistic approach
>>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>>> defer client lookups if notified ahead that any assignment is ongoing for
>>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>>> needs to individually compact the conflicting assignment requests.
>>>
>>> Conversely, one downside of the pessimistic approach (single leader
>>> pre-filter) is that when there are not many conflict concurrent assignment
>>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>>> the requests need to redundantly go through the leader compaction.
>>>
>>>
>>>
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>>
>>>> PIP 215 has its own complexity too. Coordinating filters
>>>> on both the client (table view) and the server (compaction) is non
>>>> trivial. The proposed API includes hard coded client configuration for
>>>> each component, which will make upgrading the version of the
>>>> compaction strategy complicated, and could lead to incorrect
>>>> interpretation of events in the stream. When a single broker is doing
>>>> the filtering, versioning is no longer a distributed problem. That
>>>> being said, I do not mean to suggest my solution is without
>>>> complexity.
>>>>
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>>
>>>> It's fair to say that we could add it later, but at that point, we
>>>> will have added this new API for compaction strategy. Are we confident
>>>> that pluggable compaction is independently an important addition to
>>>> Pulsar's
>>>> features, or would it make sense to make this API only exposed in the
>>>> broker?
>>>>
>>>>
>>> The intention is that this compaction feature could be useful for
>>> complex user applications (if they are trying to do a similar thing). As I
>>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>>> planning to expose this feature to users soon unless demanded and proven to
>>> be stable.
>>>
>>>
>>>> Thanks,
>>>> Michael
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>> <heesung.s...@streamnative.io.invalid> wrote:
>>>> >
>>>> > Hi,
>>>> >
>>>> > Also, I thought about some concurrent assignment scenarios between
>>>> > pre-filter vs post-filter.
>>>> >
>>>> > Example 1: When the topic broadcast is slower than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>>> {broker B
>>>> > assigned bundle x to broker C}
>>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>>> {broker C
>>>> > assigned bundle x to broker B}
>>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1,m2,
>>>> > and m3
>>>> > t5: L -> compacted topic // leader compacted the messages and
>>>> broadcasted
>>>> > m1 to all consumers
>>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>>> > bundle x to broker C}
>>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>>> > bundle x to broker B}
>>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>> and m3
>>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>>> to m1.
>>>> >
>>>> > Analysis: the "pre-filter + two-topics" option can reduce the number
>>>> of
>>>> > messages to broadcast at the expense of the leader broker compaction.
>>>> >
>>>> >
>>>> > Example 2: When the topic broadcast is faster than the concurrent
>>>> > assignment requests
>>>> >
>>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>>> > t1: A -> non-compacted topic // broker A published a message to the
>>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>> > t2: non-compacted topic -> L // leader broker consumed the messages:
>>>> m1
>>>> > t3: L -> compacted topic // leader compacted the message and
>>>> broadcasted m1
>>>> > to all consumers
>>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>>> accepted,
>>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>>> > t6: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t7: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > With post-filter + a single topic
>>>> > t1: A -> topic // broker A published a message to the non-compacted
>>>> topic,
>>>> > m1: {broker A assigned bundle x to broker A}
>>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>>> > requests)
>>>> > t4: B -> defer client lookups // broker B knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> > t5: C -> defer client lookups // broker C knows that bundle
>>>> assignment is
>>>> > running(meanwhile deferring lookup requests)
>>>> >
>>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>>> case
>>>> > without the additional leader coordination and the secondary topic
>>>> because
>>>> > the early broadcast can inform all brokers and prevent them from
>>>> requesting
>>>> > other assignments for the same bundle.
>>>> >
>>>> > I think the post-filter option is initially not bad because:
>>>> >
>>>> > 1. it is safe in the worst case (in case the messages are not
>>>> correctly
>>>> > pre-filtered at the leader)
>>>> > 2. it performs ok because the early broadcast can prevent
>>>> > concurrent assignment requests.
>>>> > 3. initially less complex to implement (leaderless conflict
>>>> resolution and
>>>> > requires a single topic)
>>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>>> logic
>>>> > as well later)
>>>> >
>>>> > Regards,
>>>> > Heesung
>>>> >
>>>> >
>>>> >
>>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>> heesung.s...@streamnative.io>
>>>> > wrote:
>>>> >
>>>> > > Hi Michael,
>>>> > >
>>>> > > For the pre-prefilter(pre-compaction) option,
>>>> > > I think the leader requires a write-through cache to compact
>>>> messages
>>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>>> the
>>>> > > last msg from the (compacted) topic before compacting the next msg
>>>> for the
>>>> > > same bundle.
>>>> > >
>>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>>> election,
>>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>>> post-filter
>>>> > > on the consumer side(compaction and table views) as well in order to
>>>> > > validate the state changes.
>>>> > >
>>>> > > For the two-topic approach,
>>>> > > I think we lose a single linearized view. Could we clarify how to
>>>> handle
>>>> > > the following(edge cases and failure recovery)?
>>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>>> topic?
>>>> > > 1. How does the leader recover state from the two topics?
>>>> > > 2. How do we handle the case when the leader fails before writing
>>>> messages
>>>> > > to the compacted topic
>>>> > >
>>>> > > Regards,
>>>> > > Heesung
>>>> > >
>>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>> mmarsh...@apache.org>
>>>> > > wrote:
>>>> > >
>>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>>> > >> instead of one. In this design, the first topic is the unfiltered
>>>> > >> write ahead log that represents many writers (brokers) trying to
>>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>>> > >> that represents the "winners" or the "owners" of the bundles.
>>>> There is
>>>> > >> a single writer, the leader broker, that reads from the input topic
>>>> > >> and writes to the output topic. The first topic is normal and the
>>>> > >> second is compacted.
>>>> > >>
>>>> > >> The primary benefit in a two topic solution is that it is easy for
>>>> the
>>>> > >> leader broker to trade off ownership without needing to slow down
>>>> > >> writes to the input topic. The leader broker will start consuming
>>>> from
>>>> > >> the input topic when it has fully consumed the table view on the
>>>> > >> output topic. In general, I don't think consumers know when they
>>>> have
>>>> > >> "reached the end of a table view", but we should be able to
>>>> trivially
>>>> > >> figure this out if we are the topic's only writer and the topic and
>>>> > >> writer are collocated on the same broker.
>>>> > >>
>>>> > >> In that design, it might make sense to use something like the
>>>> > >> replication cursor to keep track of this consumer's state.
>>>> > >>
>>>> > >> - Michael
>>>> > >>
>>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>> mmarsh...@apache.org>
>>>> > >> wrote:
>>>> > >> >
>>>> > >> > Thanks for your proposal, Heesung.
>>>> > >> >
>>>> > >> > Fundamentally, we have the problems listed in this PIP because
>>>> we have
>>>> > >> > multiple writers instead of just one writer. Can we solve this
>>>> problem
>>>> > >> > by changing our write pattern? What if we use the leader broker
>>>> as the
>>>> > >> > single writer? That broker would intercept attempts to acquire
>>>> > >> > ownership on bundles and would grant ownership to the first
>>>> broker to
>>>> > >> > claim an unassigned bundle. It could "grant ownership" by
>>>> letting the
>>>> > >> > first write to claim an unassigned bundle get written to the
>>>> ownership
>>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>>> that
>>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>>> > >> > ownership log, which will correctly work with the existing topic
>>>> > >> > compaction and table view solutions. My proposal essentially
>>>> moves the
>>>> > >> > conflict resolution to just before the write, and as a
>>>> consequence, it
>>>> > >> > greatly reduces the need for post processing of the event log.
>>>> One
>>>> > >> > trade off might be that the leader broker could slow down the
>>>> write
>>>> > >> > path, but given that the leader would just need to verify the
>>>> current
>>>> > >> > state of the bundle, I think it'd be performant enough.
>>>> > >> >
>>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>>> bundle
>>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>>> > >> > paradigm.
>>>> > >> >
>>>> > >> > Below are some additional thoughts that will be relevant if we
>>>> move
>>>> > >> > forward with the design as it is currently proposed.
>>>> > >> >
>>>> > >> > I think it might be helpful to update the title to show that this
>>>> > >> > proposal will also affect table view as well. I didn't catch
>>>> that at
>>>> > >> > first.
>>>> > >> >
>>>> > >> > Do you have any documentation describing how the
>>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>>> the
>>>> > >> > context of load balancing? I looked at
>>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>> seem to
>>>> > >> > find anything for it. That would help make this proposal less
>>>> > >> > abstract.
>>>> > >> >
>>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>>> example,
>>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>>> > >> > fundamental question for compaction is which value to keep (or
>>>> build a
>>>> > >> > new value). I think we might be able to simplify the API by
>>>> replacing
>>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a
>>>> single
>>>> > >> > method that lets the implementation handle one or all tasks. That
>>>> > >> > would also remove the need to deserialize payloads multiple
>>>> times too.
>>>> > >> >
>>>> > >> > I also feel like mentioning that after working with the PIP 105
>>>> broker
>>>> > >> > side filtering, I think we should avoid running UDFs in the
>>>> broker as
>>>> > >> > much as possible. (I do not consider the load balancing logic to
>>>> be a
>>>> > >> > UDF here.) I think it would be worth not making this a user
>>>> facing
>>>> > >> > feature unless there is demand for real use cases.
>>>> > >> >
>>>> > >> > Thanks!
>>>> > >> > Michael
>>>> > >> >
>>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote:
>>>> > >> > >
>>>> > >> > > +1(non-binding)
>>>> > >> > >
>>>> > >> > > thanks,
>>>> > >> > > bo
>>>> > >> > >
>>>> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid>
>>>> 于2022年10月19日周三
>>>> > >> 07:54写道:
>>>> > >> > > >
>>>> > >> > > > Hi pulsar-dev community,
>>>> > >> > > >
>>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>>> Compaction
>>>> > >> Strategy
>>>> > >> > > >
>>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>>> > >> > > >
>>>> > >> > > > Regards,
>>>> > >> > > > Heesung
>>>> > >>
>>>> > >
>>>>
>>>

Reply via email to