Hi all,

I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED,
but keep wiping the state on error, and I'd vote for this solution
when introducing `default.state.isolation.level`. This way, we'd have
the most low-risk roll-out of this feature (no behavior change without
reconfiguration), with the possibility of switching to the most sane /
battle-tested default settings in 4.0. Essentially, we'd have a
feature flag but call it `default.state.isolation.level` and don't
have to deprecate it later.

So the possible configurations would then be this:

1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
reads from DB.
2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from
WriteBatch/DB. Flush on error (see note below).
3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ
reads from DB. Wipe state on error.
4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from WriteBatch/DB.

I believe the feature is important enough that we will see good
adoption even without changing the default. In 4.0, when we have seen
this being adopted and is battle-tested, we make READ_COMMITTED the
default for EOS, or even READ_COMITTED always the default, depending
on our experiences. And we could add a clever implementation of
READ_UNCOMITTED with WriteBatches later.

The only smell here is that `default.state.isolation.level` wouldn't
be purely an IQ setting, but it would also (slightly) change the
behavior of the processing, but that seems unavoidable as long as we
haven't solve READ_UNCOMITTED IQ with WriteBatches.

Minor: As for Bruno's point 4, I think if we are concerned about this
behavior (we don't necessarily have to be, because it doesn't violate
ALOS guarantees as far as I can see), we could make
ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing
the WriteBatch on error (obviously, only if we have a chance to do
that).

Cheers,
Lucas

On Mon, Oct 16, 2023 at 12:19 PM Nick Telford <nick.telf...@gmail.com> wrote:
>
> Hi Guozhang,
>
> The KIP as it stands introduces a new configuration,
> default.state.isolation.level, which is independent of processing.mode.
> It's intended that this new configuration be used to configure a global IQ
> isolation level in the short term, with a future KIP introducing the
> capability to change the isolation level on a per-query basis, falling back
> to the "default" defined by this config. That's why I called it "default",
> for future-proofing.
>
> However, it currently includes the caveat that READ_UNCOMMITTED is not
> available under EOS. I think this is the coupling you are alluding to?
>
> This isn't intended to be a restriction of the API, but is currently a
> technical limitation. However, after discussing with some users about
> use-cases that would require READ_UNCOMMITTED under EOS, I'm inclined to
> remove that clause and put in the necessary work to make that combination
> possible now.
>
> I currently see two possible approaches:
>
>    1. Disable TX StateStores internally when the IsolationLevel is
>    READ_UNCOMMITTED and the processing.mode is EOS. This is more difficult
>    than it sounds, as there are many assumptions being made throughout the
>    internals about the guarantees StateStores provide. It would definitely add
>    a lot of extra "if (read_uncommitted && eos)" branches, complicating
>    maintenance and testing.
>    2. Invest the time *now* to make READ_UNCOMMITTED of EOS StateStores
>    possible. I have some ideas on how this could be achieved, but they would
>    need testing and could introduce some additional issues. The benefit of
>    this approach is that it would make query-time IsolationLevels much simpler
>    to implement in the future.
>
> Unfortunately, both will require considerable work that will further delay
> this KIP, which was the reason I placed the restriction in the KIP in the
> first place.
>
> Regards,
> Nick
>
> On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <guozhang.wang...@gmail.com>
> wrote:
>
> > Hello Nick,
> >
> > First of all, thanks a lot for the great effort you've put in driving
> > this KIP! I really like it coming through finally, as many people in
> > the community have raised this. At the same time I honestly feel a bit
> > ashamed for not putting enough of my time supporting it and pushing it
> > through the finish line (you raised this KIP almost a year ago).
> >
> > I briefly passed through the DISCUSS thread so far, not sure I've 100
> > percent digested all the bullet points. But with the goal of trying to
> > help take it through the finish line in mind, I'd want to throw
> > thoughts on top of my head only on the point #4 above which I felt may
> > be the main hurdle for the current KIP to drive to a consensus now.
> >
> > The general question I asked myself is, whether we want to couple "IQ
> > reading mode" with "processing mode". While technically I tend to
> > agree with you that, it's feels like a bug if some single user chose
> > "EOS" for processing mode while choosing "read uncommitted" for IQ
> > reading mode, at the same time, I'm thinking if it's possible that
> > there could be two different persons (or even two teams) that would be
> > using the stream API to build the app, and the IQ API to query the
> > running state of the app. I know this is less of a technical thing but
> > rather a more design stuff, but if it could be ever the case, I'm
> > wondering if the personale using the IQ API knows about the risks of
> > using read uncommitted but still chose so for the favor of
> > performance, no matter if the underlying stream processing mode
> > configured by another personale is EOS or not. In that regard, I'm
> > leaning towards a "leaving the door open, and close it later if we
> > found it's a bad idea" aspect with a configuration that we can
> > potentially deprecate than "shut the door, clean for everyone". More
> > specifically, allowing the processing mode / IQ read mode to be
> > decoupled, and if we found that there's no such cases as I speculated
> > above or people started complaining a lot, we can still enforce
> > coupling them.
> >
> > Again, just my 2c here. Thanks again for the great patience and
> > diligence on this KIP.
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> > >
> > > Hi Bruno,
> > >
> > > 4.
> > > I'll hold off on making that change until we have a consensus as to what
> > > configuration to use to control all of this, as it'll be affected by the
> > > decision on EOS isolation levels.
> > >
> > > 5.
> > > Done. I've chosen "committedOffsets".
> > >
> > > Regards,
> > > Nick
> > >
> > > On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna <cado...@apache.org> wrote:
> > >
> > > > Hi Nick,
> > > >
> > > > 1.
> > > > Yeah, you are probably right that it does not make too much sense.
> > > > Thanks for the clarification!
> > > >
> > > >
> > > > 4.
> > > > Yes, sorry for the back and forth, but I think for the sake of the KIP
> > > > it is better to let the ALOS behavior as it is for now due to the
> > > > possible issues you would run into. Maybe we can find a solution in the
> > > > future. Now the question returns to whether we really need
> > > > default.state.isolation.level. Maybe the config could be the feature
> > > > flag Sophie requested.
> > > >
> > > >
> > > > 5.
> > > > There is a guideline in Kafka not to use the get prefix for getters (at
> > > > least in the public API). Thus, could you please rename
> > > >
> > > > getCommittedOffset(TopicPartition partition) ->
> > > > committedOffsetFor(TopicPartition partition)
> > > >
> > > > You can also propose an alternative to committedOffsetFor().
> > > >
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > >
> > > > On 10/13/23 3:21 PM, Nick Telford wrote:
> > > > > Hi Bruno,
> > > > >
> > > > > Thanks for getting back to me.
> > > > >
> > > > > 1.
> > > > > I think this should be possible. Are you thinking of the situation
> > where
> > > > a
> > > > > user may downgrade to a previous version of Kafka Streams? In that
> > case,
> > > > > sadly, the RocksDBStore would get wiped by the older version of Kafka
> > > > > Streams anyway, because that version wouldn't understand the extra
> > column
> > > > > family (that holds offsets), so the missing Position file would
> > > > > automatically get rebuilt when the store is rebuilt from the
> > changelog.
> > > > > Are there other situations than downgrade where a transactional store
> > > > could
> > > > > be replaced by a non-transactional one? I can't think of any.
> > > > >
> > > > > 2.
> > > > > Ahh yes, the Test Plan - my Kryptonite! This section definitely
> > needs to
> > > > be
> > > > > fleshed out. I'll work on that. How much detail do you need?
> > > > >
> > > > > 3.
> > > > > See my previous email discussing this.
> > > > >
> > > > > 4.
> > > > > Hmm, this is an interesting point. Are you suggesting that under ALOS
> > > > > READ_COMMITTED should not be supported?
> > > > >
> > > > > Regards,
> > > > > Nick
> > > > >
> > > > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna <cado...@apache.org>
> > wrote:
> > > > >
> > > > >> Hi Nick,
> > > > >>
> > > > >> I think the KIP is converging!
> > > > >>
> > > > >>
> > > > >> 1.
> > > > >> I am wondering whether it makes sense to write the position file
> > during
> > > > >> close as we do for the checkpoint file, so that in case the state
> > store
> > > > >> is replaced with a non-transactional state store the
> > non-transactional
> > > > >> state store finds the position file. I think, this is not strictly
> > > > >> needed, but would be a nice behavior instead of just deleting the
> > > > >> position file.
> > > > >>
> > > > >>
> > > > >> 2.
> > > > >> The test plan does not mention integration tests. Do you not need to
> > > > >> extend existing ones and add new ones. Also for upgrading and
> > > > >> downgrading you might need integration and/or system tests.
> > > > >>
> > > > >>
> > > > >> 3.
> > > > >> I think Sophie made a point. Although, IQ reading from uncommitted
> > data
> > > > >> under EOS might be considered a bug by some people. Thus, your KIP
> > would
> > > > >> fix a bug rather than changing the intended behavior. However, I
> > also
> > > > >> see that a feature flag would help users that rely on this buggy
> > > > >> behavior (at least until AK 4.0).
> > > > >>
> > > > >>
> > > > >> 4.
> > > > >> This is related to the previous point. I assume that the difference
> > > > >> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the
> > > > >> former you enable transactions on the state store and in the latter
> > you
> > > > >> disable them. If my assumption is correct, I think that is an issue.
> > > > >> Let's assume under ALOS Streams fails over a couple of times more or
> > > > >> less at the same step in processing after value 3 is added to an
> > > > >> aggregation but the offset of the corresponding input record was not
> > > > >> committed. Without transactions disabled, the aggregation value
> > would
> > > > >> increase by 3 for each failover. With transactions enabled, value 3
> > > > >> would only be added to the aggregation once when the offset of the
> > input
> > > > >> record is committed and the transaction finally completes. So the
> > > > >> content of the state store would change depending on the
> > configuration
> > > > >> for IQ. IMO, the content of the state store should be independent
> > from
> > > > >> IQ. Given this issue, I propose to not use transactions with ALOS at
> > > > >> all. I was a big proponent of using transactions with ALOS, but I
> > > > >> realized that transactions with ALOS is not as easy as enabling
> > > > >> transactions on state stores. Another aspect that is problematic is
> > that
> > > > >> the changelog topic which actually replicates the state store is not
> > > > >> transactional under ALOS. Thus, it might happen that the state
> > store and
> > > > >> the changelog differ in their content. All of this is maybe solvable
> > > > >> somehow, but for the sake of this KIP, I would leave it for the
> > future.
> > > > >>
> > > > >>
> > > > >> Best,
> > > > >> Bruno
> > > > >>
> > > > >>
> > > > >>
> > > > >> On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:
> > > > >>> Hey Nick! First of all thanks for taking up this awesome feature,
> > I'm
> > > > >> sure
> > > > >>> every single
> > > > >>> Kafka Streams user and dev would agree that it is sorely needed.
> > > > >>>
> > > > >>> I've just been catching up on the KIP and surrounding discussion,
> > so
> > > > >> please
> > > > >>> forgive me
> > > > >>> for any misunderstandings or misinterpretations of the current
> > plan and
> > > > >>> don't hesitate to
> > > > >>> correct me.
> > > > >>>
> > > > >>> Before I jump in, I just want to say that having seen this drag on
> > for
> > > > so
> > > > >>> long, my singular
> > > > >>> goal in responding is to help this KIP past a perceived impasse so
> > we
> > > > can
> > > > >>> finally move on
> > > > >>> to voting and implementing it. Long discussions are to be expected
> > for
> > > > >>> major features like
> > > > >>> this but it's completely on us as the Streams devs to make sure
> > there
> > > > is
> > > > >> an
> > > > >>> end in sight
> > > > >>> for any ongoing discussion.
> > > > >>>
> > > > >>> With that said, it's my understanding that the KIP as currently
> > > > proposed
> > > > >> is
> > > > >>> just not tenable
> > > > >>> for Kafka Streams, and would prevent some EOS users from upgrading
> > to
> > > > the
> > > > >>> version it
> > > > >>> first appears in. Given that we can't predict or guarantee whether
> > any
> > > > of
> > > > >>> the followup KIPs
> > > > >>> would be completed in the same release cycle as this one, we need
> > to
> > > > make
> > > > >>> sure that the
> > > > >>> feature is either compatible with all current users or else
> > > > >> feature-flagged
> > > > >>> so that they may
> > > > >>> opt in/out.
> > > > >>>
> > > > >>> Therefore, IIUC we need to have either (or both) of these as
> > > > >>> fully-implemented config options:
> > > > >>> 1. default.state.isolation.level
> > > > >>> 2. enable.transactional.state.stores
> > > > >>>
> > > > >>> This way EOS users for whom read_committed semantics are not
> > viable can
> > > > >>> still upgrade,
> > > > >>> and either use the isolation.level config to leverage the new txn
> > state
> > > > >>> stores without sacrificing
> > > > >>> their application semantics, or else simply keep the transactional
> > > > state
> > > > >>> stores disabled until we
> > > > >>> are able to fully implement the isolation level configuration at
> > either
> > > > >> an
> > > > >>> application or query level.
> > > > >>>
> > > > >>> Frankly you are the expert here and know much more about the
> > tradeoffs
> > > > in
> > > > >>> both semantics and
> > > > >>> effort level of implementing one of these configs vs the other. In
> > my
> > > > >>> opinion, either option would
> > > > >>> be fine and I would leave the decision of which one to include in
> > this
> > > > >> KIP
> > > > >>> completely up to you.
> > > > >>> I just don't see a way for the KIP to proceed without some
> > variation of
> > > > >> the
> > > > >>> above that would allow
> > > > >>> EOS users to opt-out of read_committed.
> > > > >>>
> > > > >>> (If it's all the same to you, I would recommend always including a
> > > > >> feature
> > > > >>> flag in large structural
> > > > >>> changes like this. No matter how much I trust someone or myself to
> > > > >>> implement a feature, you just
> > > > >>> never know what kind of bugs might slip in, especially with the
> > very
> > > > >> first
> > > > >>> iteration that gets released.
> > > > >>> So personally, my choice would be to add the feature flag and
> > leave it
> > > > >> off
> > > > >>> by default. If all goes well
> > > > >>> you can do a quick KIP to enable it by default as soon as the
> > > > >>> isolation.level config has been
> > > > >>> completed. But feel free to just pick whichever option is easiest
> > or
> > > > >>> quickest for you to implement)
> > > > >>>
> > > > >>> Hope this helps move the discussion forward,
> > > > >>> Sophie
> > > > >>>
> > > > >>> On Tue, Sep 19, 2023 at 1:57 AM Nick Telford <
> > nick.telf...@gmail.com>
> > > > >> wrote:
> > > > >>>
> > > > >>>> Hi Bruno,
> > > > >>>>
> > > > >>>> Agreed, I can live with that for now.
> > > > >>>>
> > > > >>>> In an effort to keep the scope of this KIP from expanding, I'm
> > leaning
> > > > >>>> towards just providing a configurable
> > default.state.isolation.level
> > > > and
> > > > >>>> removing IsolationLevel from the StateStoreContext. This would be
> > > > >>>> compatible with adding support for query-time IsolationLevels in
> > the
> > > > >>>> future, whilst providing a way for users to select an isolation
> > level
> > > > >> now.
> > > > >>>>
> > > > >>>> The big problem with this, however, is that if a user selects
> > > > >>>> processing.mode
> > > > >>>> = "exactly-once(-v2|-beta)", and default.state.isolation.level =
> > > > >>>> "READ_UNCOMMITTED", we need to guarantee that the data isn't
> > written
> > > > to
> > > > >>>> disk until commit() is called, but we also need to permit IQ
> > threads
> > > > to
> > > > >>>> read from the ongoing transaction.
> > > > >>>>
> > > > >>>> A simple solution would be to (temporarily) forbid this
> > combination of
> > > > >>>> configuration, and have default.state.isolation.level
> > automatically
> > > > >> switch
> > > > >>>> to READ_COMMITTED when processing.mode is anything other than
> > > > >>>> at-least-once. Do you think this would be acceptable?
> > > > >>>>
> > > > >>>> In a later KIP, we can add support for query-time isolation
> > levels and
> > > > >>>> solve this particular problem there, which would relax this
> > > > restriction.
> > > > >>>>
> > > > >>>> Regards,
> > > > >>>> Nick
> > > > >>>>
> > > > >>>> On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna <cado...@apache.org>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I
> > > > think
> > > > >>>>> it is perfectly valid to say InMemoryKeyValueStore do not support
> > > > >>>>> READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto
> > > > default
> > > > >>>>> at the moment.
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Bruno
> > > > >>>>>
> > > > >>>>> On 9/18/23 7:12 PM, Nick Telford wrote:
> > > > >>>>>> Oh! One other concern I haven't mentioned: if we make
> > > > IsolationLevel a
> > > > >>>>>> query-time constraint, then we need to add support for
> > > > READ_COMMITTED
> > > > >>>> to
> > > > >>>>>> InMemoryKeyValueStore too, which will require some changes to
> > the
> > > > >>>>>> implementation.
> > > > >>>>>>
> > > > >>>>>> On Mon, 18 Sept 2023 at 17:24, Nick Telford <
> > nick.telf...@gmail.com
> > > > >
> > > > >>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi everyone,
> > > > >>>>>>>
> > > > >>>>>>> I agree that having IsolationLevel be determined at query-time
> > is
> > > > the
> > > > >>>>>>> ideal design, but there are a few sticking points:
> > > > >>>>>>>
> > > > >>>>>>> 1.
> > > > >>>>>>> There needs to be some way to communicate the IsolationLevel
> > down
> > > > to
> > > > >>>> the
> > > > >>>>>>> RocksDBStore itself, so that the query can respect it. Since
> > stores
> > > > >>>> are
> > > > >>>>>>> "layered" in functionality (i.e. ChangeLoggingStore,
> > MeteredStore,
> > > > >>>>> etc.),
> > > > >>>>>>> we need some way to deliver that information to the bottom
> > layer.
> > > > For
> > > > >>>>> IQv2,
> > > > >>>>>>> we can use the existing State#query() method, but IQv1 has no
> > way
> > > > to
> > > > >>>> do
> > > > >>>>>>> this.
> > > > >>>>>>>
> > > > >>>>>>> A simple approach, which would potentially open up other
> > options,
> > > > >>>> would
> > > > >>>>> be
> > > > >>>>>>> to add something like: ReadOnlyKeyValueStore<K, V>
> > > > >>>>>>> readOnlyView(IsolationLevel isolationLevel) to
> > > > ReadOnlyKeyValueStore
> > > > >>>>> (and
> > > > >>>>>>> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).
> > > > >>>>>>>
> > > > >>>>>>> 2.
> > > > >>>>>>> As mentioned above, RocksDB WriteBatches are not thread-safe,
> > which
> > > > >>>>> causes
> > > > >>>>>>> a problem if we want to provide READ_UNCOMMITTED Iterators. I
> > also
> > > > >>>> had a
> > > > >>>>>>> look at RocksDB Transactions[1], but they solve a very
> > different
> > > > >>>>> problem,
> > > > >>>>>>> and have the same thread-safety issue.
> > > > >>>>>>>
> > > > >>>>>>> One possible approach that I mentioned is chaining
> > WriteBatches:
> > > > >> every
> > > > >>>>>>> time a new Interactive Query is received (i.e. readOnlyView,
> > see
> > > > >>>> above,
> > > > >>>>>>> is called) we "freeze" the existing WriteBatch, and start a
> > new one
> > > > >>>> for
> > > > >>>>> new
> > > > >>>>>>> writes. The Interactive Query queries the "chain" of previous
> > > > >>>>> WriteBatches
> > > > >>>>>>> + the underlying database; while the StreamThread starts
> > writing to
> > > > >>>> the
> > > > >>>>>>> *new* WriteBatch. On-commit, the StreamThread would write *all*
> > > > >>>>>>> WriteBatches in the chain to the database (that have not yet
> > been
> > > > >>>>> written).
> > > > >>>>>>>
> > > > >>>>>>> WriteBatches would be closed/freed only when they have been
> > both
> > > > >>>>>>> committed, and all open Interactive Queries on them have been
> > > > closed.
> > > > >>>>> This
> > > > >>>>>>> would require some reference counting.
> > > > >>>>>>>
> > > > >>>>>>> Obviously a drawback of this approach is the potential for
> > > > increased
> > > > >>>>>>> memory usage: if an Interactive Query is long-lived, for
> > example by
> > > > >>>>> doing a
> > > > >>>>>>> full scan over a large database, or even just pausing in the
> > middle
> > > > >> of
> > > > >>>>> an
> > > > >>>>>>> iteration, then the existing chain of WriteBatches could be
> > kept
> > > > >>>> around
> > > > >>>>> for
> > > > >>>>>>> a long time, potentially forever.
> > > > >>>>>>>
> > > > >>>>>>> --
> > > > >>>>>>>
> > > > >>>>>>> A.
> > > > >>>>>>> Going off on a tangent, it looks like in addition to supporting
> > > > >>>>>>> READ_COMMITTED queries, we could go further and support
> > > > >>>> REPEATABLE_READ
> > > > >>>>>>> queries (i.e. where subsequent reads to the same key in the
> > same
> > > > >>>>>>> Interactive Query are guaranteed to yield the same value) by
> > making
> > > > >>>> use
> > > > >>>>> of
> > > > >>>>>>> RocksDB Snapshots[2]. These are fairly lightweight, so the
> > > > >> performance
> > > > >>>>>>> impact is likely to be negligible, but they do require that the
> > > > >>>>> Interactive
> > > > >>>>>>> Query session can be explicitly closed.
> > > > >>>>>>>
> > > > >>>>>>> This could be achieved if we made the above readOnlyView
> > interface
> > > > >>>> look
> > > > >>>>>>> more like:
> > > > >>>>>>>
> > > > >>>>>>> interface ReadOnlyKeyValueView<K, V> implements
> > > > >>>> ReadOnlyKeyValueStore<K,
> > > > >>>>>>> V>, AutoCloseable {}
> > > > >>>>>>>
> > > > >>>>>>> interface ReadOnlyKeyValueStore<K, V> {
> > > > >>>>>>>        ...
> > > > >>>>>>>        ReadOnlyKeyValueView<K, V> readOnlyView(IsolationLevel
> > > > >>>>> isolationLevel);
> > > > >>>>>>> }
> > > > >>>>>>>
> > > > >>>>>>> But this would be a breaking change, as existing IQv1 queries
> > are
> > > > >>>>>>> guaranteed to never call store.close(), and therefore these
> > would
> > > > >> leak
> > > > >>>>>>> memory under REPEATABLE_READ.
> > > > >>>>>>>
> > > > >>>>>>> B.
> > > > >>>>>>> One thing that's notable: MyRocks states that they support
> > > > >>>>> READ_COMMITTED
> > > > >>>>>>> and REPEATABLE_READ, but they make no mention of
> > > > >>>> READ_UNCOMMITTED[3][4].
> > > > >>>>>>> This could be because doing so is technically
> > difficult/impossible
> > > > >>>> using
> > > > >>>>>>> the primitives available in RocksDB.
> > > > >>>>>>>
> > > > >>>>>>> --
> > > > >>>>>>>
> > > > >>>>>>> Lucas, to address your points:
> > > > >>>>>>>
> > > > >>>>>>> U1.
> > > > >>>>>>> It's only "SHOULD" to permit alternative (i.e. non-RocksDB)
> > > > >>>>>>> implementations of StateStore that do not support atomic
> > writes.
> > > > >>>>> Obviously
> > > > >>>>>>> in those cases, the guarantees Kafka Streams provides/expects
> > would
> > > > >> be
> > > > >>>>>>> relaxed. Do you think we should require all implementations to
> > > > >> support
> > > > >>>>>>> atomic writes?
> > > > >>>>>>>
> > > > >>>>>>> U2.
> > > > >>>>>>> Stores can support multiple IsolationLevels. As we've discussed
> > > > >> above,
> > > > >>>>> the
> > > > >>>>>>> ideal scenario would be to specify the IsolationLevel at
> > > > query-time.
> > > > >>>>>>> Failing that, I think the second-best approach is to define the
> > > > >>>>>>> IsolationLevel for *all* queries based on the processing.mode,
> > > > which
> > > > >>>> is
> > > > >>>>>>> what the default StateStoreContext#isolationLevel() achieves.
> > Would
> > > > >>>> you
> > > > >>>>>>> prefer an alternative?
> > > > >>>>>>>
> > > > >>>>>>> While the existing implementation is equivalent to
> > > > READ_UNCOMMITTED,
> > > > >>>>> this
> > > > >>>>>>> can yield unexpected results/errors under EOS, if a
> > transaction is
> > > > >>>>> rolled
> > > > >>>>>>> back. While this would be a change in behaviour for users, it
> > would
> > > > >>>> look
> > > > >>>>>>> more like a bug fix than a breaking change. That said, we
> > *could*
> > > > >> make
> > > > >>>>> it
> > > > >>>>>>> configurable, and default to the existing behaviour
> > > > >> (READ_UNCOMMITTED)
> > > > >>>>>>> instead of inferring it from the processing.mode?
> > > > >>>>>>>
> > > > >>>>>>> N1, N2.
> > > > >>>>>>> These were only primitives to avoid boxing costs, but since
> > this is
> > > > >>>> not
> > > > >>>>> a
> > > > >>>>>>> performance sensitive area, it should be fine to change if
> > that's
> > > > >>>>> desirable.
> > > > >>>>>>>
> > > > >>>>>>> N3.
> > > > >>>>>>> It's because the store "manages its own offsets", which
> > includes
> > > > both
> > > > >>>>>>> committing the offset, *and providing it* via
> > getCommittedOffset().
> > > > >>>>>>> Personally, I think "managesOffsets" conveys this best, but I
> > don't
> > > > >>>> mind
> > > > >>>>>>> changing it if the nomenclature is unclear.
> > > > >>>>>>>
> > > > >>>>>>> Sorry for the massive emails/essays!
> > > > >>>>>>> --
> > > > >>>>>>> Nick
> > > > >>>>>>>
> > > > >>>>>>> 1: https://github.com/facebook/rocksdb/wiki/Transactions
> > > > >>>>>>> 2: https://github.com/facebook/rocksdb/wiki/Snapshot
> > > > >>>>>>> 3:
> > > > https://github.com/facebook/mysql-5.6/wiki/Transaction-Isolation
> > > > >>>>>>> 4: https://mariadb.com/kb/en/myrocks-transactional-isolation/
> > > > >>>>>>>
> > > > >>>>>>> On Mon, 18 Sept 2023 at 16:19, Lucas Brutschy
> > > > >>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Nick,
> > > > >>>>>>>>
> > > > >>>>>>>> since I last read it in April, the KIP has become much
> > cleaner and
> > > > >>>>>>>> easier to read. Great work!
> > > > >>>>>>>>
> > > > >>>>>>>> It feels to me the last big open point is whether we can
> > implement
> > > > >>>>>>>> isolation level as a query parameter. I understand that there
> > are
> > > > >>>>>>>> implementation concerns, but as Colt says, it would be a great
> > > > >>>>>>>> addition, and would also simplify the migration path for this
> > > > >> change.
> > > > >>>>>>>> Is the implementation problem you mentioned caused by the
> > > > WriteBatch
> > > > >>>>>>>> not having a notion of a snapshot, as the underlying DB
> > iterator
> > > > >>>> does?
> > > > >>>>>>>> In that case, I am not sure a chain of WriteBatches as you
> > propose
> > > > >>>>>>>> would fully solve the problem, but maybe I didn't dig enough
> > into
> > > > >> the
> > > > >>>>>>>> details to fully understand it.
> > > > >>>>>>>>
> > > > >>>>>>>> If it's not possible to implement it now, would it be an
> > option to
> > > > >>>>>>>> make sure in this KIP that we do not fully close the door on
> > > > >>>> per-query
> > > > >>>>>>>> isolation levels in the interface, as it may be possible to
> > > > >> implement
> > > > >>>>>>>> the missing primitives in RocksDB or Speedb in the future.
> > > > >>>>>>>>
> > > > >>>>>>>> Understanding:
> > > > >>>>>>>>
> > > > >>>>>>>> * U1) Why is it only "SHOULD" for changelogOffsets to be
> > persisted
> > > > >>>>>>>> atomically with the records?
> > > > >>>>>>>> * U2) Don't understand the default implementation of
> > > > >>>> `isolationLevel`.
> > > > >>>>>>>> The isolation level should be a property of the underlying
> > store,
> > > > >> and
> > > > >>>>>>>> not be defined by the default config? Existing stores probably
> > > > don't
> > > > >>>>>>>> guarantee READ_COMMITTED, so the default should be to return
> > > > >>>>>>>> READ_UNCOMMITTED.
> > > > >>>>>>>>
> > > > >>>>>>>> Nits:
> > > > >>>>>>>> * N1) Could `getComittedOffset` use an `OptionalLong` return
> > type,
> > > > >> to
> > > > >>>>>>>> avoid the `null`?
> > > > >>>>>>>> * N2) Could `apporixmateNumUncomittedBytes` use an
> > `OptionalLong`
> > > > >>>>>>>> return type, to avoid the `-1`?
> > > > >>>>>>>> * N3) I don't understand why `managesOffsets` uses the
> > 'manage'
> > > > >> verb,
> > > > >>>>>>>> whereas all other methods use the "commits" verb. I'd suggest
> > > > >>>>>>>> `commitsOffsets`.
> > > > >>>>>>>>
> > > > >>>>>>>> Either way, it feels this KIP is very close to the finish
> > line,
> > > > I'm
> > > > >>>>>>>> looking forward to seeing this in production!
> > > > >>>>>>>>
> > > > >>>>>>>> Cheers,
> > > > >>>>>>>> Lucas
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Sep 18, 2023 at 6:57 AM Colt McNealy <
> > c...@littlehorse.io
> > > > >
> > > > >>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Making IsolationLevel a query-time constraint, rather than
> > > > linking
> > > > >>>> it
> > > > >>>>>>>> to
> > > > >>>>>>>>> the processing.guarantee.
> > > > >>>>>>>>>
> > > > >>>>>>>>> As I understand it, would this allow even a user of EOS to
> > > > control
> > > > >>>>>>>> whether
> > > > >>>>>>>>> reading committed or uncommitted records? If so, I am highly
> > in
> > > > >>>> favor
> > > > >>>>> of
> > > > >>>>>>>>> this.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I know that I was one of the early people to point out the
> > > > current
> > > > >>>>>>>>> shortcoming that IQ reads uncommitted records, but just this
> > > > >>>> morning I
> > > > >>>>>>>>> realized a pattern we use which means that (for certain
> > queries)
> > > > >> our
> > > > >>>>>>>> system
> > > > >>>>>>>>> needs to be able to read uncommitted records, which is the
> > > > current
> > > > >>>>>>>> behavior
> > > > >>>>>>>>> of Kafka Streams in EOS.***
> > > > >>>>>>>>>
> > > > >>>>>>>>> If IsolationLevel being a query-time decision allows for
> > this,
> > > > then
> > > > >>>>> that
> > > > >>>>>>>>> would be amazing. I would also vote that the default behavior
> > > > >> should
> > > > >>>>> be
> > > > >>>>>>>> for
> > > > >>>>>>>>> reading uncommitted records, because it is totally possible
> > for a
> > > > >>>>> valid
> > > > >>>>>>>>> application to depend on that behavior, and breaking it in a
> > > > minor
> > > > >>>>>>>> release
> > > > >>>>>>>>> might be a bit strong.
> > > > >>>>>>>>>
> > > > >>>>>>>>> *** (Note, for the curious reader....) Our use-case/query
> > pattern
> > > > >>>> is a
> > > > >>>>>>>> bit
> > > > >>>>>>>>> complex, but reading "uncommitted" records is actually safe
> > in
> > > > our
> > > > >>>>> case
> > > > >>>>>>>>> because processing is deterministic. Additionally, IQ being
> > able
> > > > to
> > > > >>>>> read
> > > > >>>>>>>>> uncommitted records is crucial to enable "read your own
> > writes"
> > > > on
> > > > >>>> our
> > > > >>>>>>>> API:
> > > > >>>>>>>>> Due to the deterministic processing, we send an "ack" to the
> > > > client
> > > > >>>>> who
> > > > >>>>>>>>> makes the request as soon as the processor processes the
> > result.
> > > > If
> > > > >>>>> they
> > > > >>>>>>>>> can't read uncommitted records, they may receive a "201 -
> > > > Created"
> > > > >>>>>>>>> response, immediately followed by a "404 - Not Found" when
> > doing
> > > > a
> > > > >>>>>>>> lookup
> > > > >>>>>>>>> for the object they just created).
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks,
> > > > >>>>>>>>> Colt McNealy
> > > > >>>>>>>>>
> > > > >>>>>>>>> *Founder, LittleHorse.dev*
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Wed, Sep 13, 2023 at 9:19 AM Nick Telford <
> > > > >>>> nick.telf...@gmail.com>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Addendum:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I think we would also face the same problem with the
> > approach
> > > > John
> > > > >>>>>>>> outlined
> > > > >>>>>>>>>> earlier (using the record cache as a transaction buffer and
> > > > >>>> flushing
> > > > >>>>>>>> it
> > > > >>>>>>>>>> straight to SST files). This is because the record cache
> > (the
> > > > >>>>>>>> ThreadCache
> > > > >>>>>>>>>> class) is not thread-safe, so every commit would invalidate
> > open
> > > > >> IQ
> > > > >>>>>>>>>> Iterators in the same way that RocksDB WriteBatches do.
> > > > >>>>>>>>>> --
> > > > >>>>>>>>>> Nick
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, 13 Sept 2023 at 16:58, Nick Telford <
> > > > >>>> nick.telf...@gmail.com>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hi Bruno,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I've updated the KIP based on our conversation. The only
> > things
> > > > >>>>>>>> I've not
> > > > >>>>>>>>>>> yet done are:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 1. Using transactions under ALOS and EOS.
> > > > >>>>>>>>>>> 2. Making IsolationLevel a query-time constraint, rather
> > than
> > > > >>>>>>>> linking it
> > > > >>>>>>>>>>> to the processing.guarantee.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> There's a wrinkle that makes this a challenge: Interactive
> > > > >> Queries
> > > > >>>>>>>> that
> > > > >>>>>>>>>>> open an Iterator, when using transactions and
> > READ_UNCOMMITTED.
> > > > >>>>>>>>>>> The problem is that under READ_UNCOMMITTED, queries need
> > to be
> > > > >>>> able
> > > > >>>>>>>> to
> > > > >>>>>>>>>>> read records from the currently uncommitted transaction
> > buffer
> > > > >>>>>>>>>>> (WriteBatch). This includes for Iterators, which should
> > iterate
> > > > >>>>>>>> both the
> > > > >>>>>>>>>>> transaction buffer and underlying database (using
> > > > >>>>>>>>>>> WriteBatch#iteratorWithBase()).
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> The issue is that when the StreamThread commits, it writes
> > the
> > > > >>>>>>>> current
> > > > >>>>>>>>>>> WriteBatch to RocksDB *and then clears the WriteBatch*.
> > > > Clearing
> > > > >>>> the
> > > > >>>>>>>>>>> WriteBatch while an Interactive Query holds an open
> > Iterator on
> > > > >> it
> > > > >>>>>>>> will
> > > > >>>>>>>>>>> invalidate the Iterator. Worse, it turns out that Iterators
> > > > over
> > > > >> a
> > > > >>>>>>>>>>> WriteBatch become invalidated not just when the WriteBatch
> > is
> > > > >>>>>>>> cleared,
> > > > >>>>>>>>>> but
> > > > >>>>>>>>>>> also when the Iterators' current key receives a new write.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Now that I'm writing this, I remember that this is the
> > major
> > > > >>>> reason
> > > > >>>>>>>> that
> > > > >>>>>>>>>> I
> > > > >>>>>>>>>>> switched the original design from having a query-time
> > > > >>>>>>>> IsolationLevel to
> > > > >>>>>>>>>>> having the IsolationLevel linked to the transactionality
> > of the
> > > > >>>>>>>> stores
> > > > >>>>>>>>>>> themselves.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> It *might* be possible to resolve this, by having a
> > "chain" of
> > > > >>>>>>>>>>> WriteBatches, with the StreamThread switching to a new
> > > > WriteBatch
> > > > >>>>>>>>>> whenever
> > > > >>>>>>>>>>> a new Interactive Query attempts to read from the
> > database, but
> > > > >>>> that
> > > > >>>>>>>>>> could
> > > > >>>>>>>>>>> cause some performance problems/memory pressure when
> > subjected
> > > > to
> > > > >>>> a
> > > > >>>>>>>> high
> > > > >>>>>>>>>>> Interactive Query load. It would also reduce the
> > efficiency of
> > > > >>>>>>>>>> WriteBatches
> > > > >>>>>>>>>>> on-commit, as we'd have to write N WriteBatches, where N
> > is the
> > > > >>>>>>>> number of
> > > > >>>>>>>>>>> Interactive Queries since the last commit.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I realise this is getting into the weeds of the
> > implementation,
> > > > >>>> and
> > > > >>>>>>>> you'd
> > > > >>>>>>>>>>> rather we focus on the API for now, but I think it's
> > important
> > > > to
> > > > >>>>>>>>>> consider
> > > > >>>>>>>>>>> how to implement the desired API, in case we come up with
> > an
> > > > API
> > > > >>>>>>>> that
> > > > >>>>>>>>>>> cannot be implemented efficiently, or even at all!
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thoughts?
> > > > >>>>>>>>>>> --
> > > > >>>>>>>>>>> Nick
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna <
> > > > cado...@apache.org
> > > > >>>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Hi Nick,
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 6.
> > > > >>>>>>>>>>>> Of course, you are right! My bad!
> > > > >>>>>>>>>>>> Wiping out the state in the downgrading case is fine.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 3a.
> > > > >>>>>>>>>>>> Focus on the public facing changes for the KIP. We will
> > manage
> > > > >> to
> > > > >>>>>>>> get
> > > > >>>>>>>>>>>> the internals right. Regarding state stores that do not
> > > > support
> > > > >>>>>>>>>>>> READ_COMMITTED, they should throw an error stating that
> > they
> > > > do
> > > > >>>> not
> > > > >>>>>>>>>>>> support READ_COMMITTED. No need to adapt all state stores
> > > > >>>>>>>> immediately.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> 3b.
> > > > >>>>>>>>>>>> I am in favor of using transactions also for ALOS.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>> Bruno
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 9/13/23 11:57 AM, Nick Telford wrote:
> > > > >>>>>>>>>>>>> Hi Bruno,
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks for getting back to me!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>> The fact that implementations can always track estimated
> > > > memory
> > > > >>>>>>>> usage
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>> the wrapper is a good point. I can remove -1 as an
> > option,
> > > > and
> > > > >>>>>>>> I'll
> > > > >>>>>>>>>>>> clarify
> > > > >>>>>>>>>>>>> the JavaDoc that 0 is not just for non-transactional
> > stores,
> > > > >>>>>>>> which is
> > > > >>>>>>>>>>>>> currently misleading.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 6.
> > > > >>>>>>>>>>>>> The problem with catching the exception in the downgrade
> > > > >> process
> > > > >>>>>>>> is
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>> would require new code in the Kafka version being
> > downgraded
> > > > >> to.
> > > > >>>>>>>> Since
> > > > >>>>>>>>>>>>> users could conceivably downgrade to almost *any* older
> > > > version
> > > > >>>>>>>> of
> > > > >>>>>>>>>> Kafka
> > > > >>>>>>>>>>>>> Streams, I'm not sure how we could add that code?
> > > > >>>>>>>>>>>>> The only way I can think of doing it would be to provide
> > a
> > > > >>>>>>>> dedicated
> > > > >>>>>>>>>>>>> downgrade tool, that goes through every local store and
> > > > removes
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> offsets column families. But that seems like an
> > unnecessary
> > > > >>>>>>>> amount of
> > > > >>>>>>>>>>>> extra
> > > > >>>>>>>>>>>>> code to maintain just to handle a somewhat niche
> > situation,
> > > > >> when
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> alternative (automatically wipe and restore stores)
> > should be
> > > > >>>>>>>>>>>> acceptable.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 1, 4, 5: Agreed. I'll make the changes you've requested.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3a.
> > > > >>>>>>>>>>>>> I agree that IsolationLevel makes more sense at
> > query-time,
> > > > and
> > > > >>>> I
> > > > >>>>>>>>>>>> actually
> > > > >>>>>>>>>>>>> initially attempted to place the IsolationLevel at
> > > > query-time,
> > > > >>>>>>>> but I
> > > > >>>>>>>>>> ran
> > > > >>>>>>>>>>>>> into some problems:
> > > > >>>>>>>>>>>>> - The key issue is that, under ALOS we're not staging
> > writes
> > > > in
> > > > >>>>>>>>>>>>> transactions, so can't perform writes at the
> > READ_COMMITTED
> > > > >>>>>>>> isolation
> > > > >>>>>>>>>>>>> level. However, this may be addressed if we decide to
> > > > *always*
> > > > >>>>>>>> use
> > > > >>>>>>>>>>>>> transactions as discussed under 3b.
> > > > >>>>>>>>>>>>> - IQv1 and IQv2 have quite different implementations. I
> > > > >> remember
> > > > >>>>>>>>>> having
> > > > >>>>>>>>>>>>> some difficulty understanding the IQv1 internals, which
> > made
> > > > it
> > > > >>>>>>>>>>>> difficult
> > > > >>>>>>>>>>>>> to determine what needed to be changed. However, I
> > *think*
> > > > this
> > > > >>>>>>>> can be
> > > > >>>>>>>>>>>>> addressed for both implementations by wrapping the
> > > > RocksDBStore
> > > > >>>>>>>> in an
> > > > >>>>>>>>>>>>> IsolationLevel-dependent wrapper, that overrides read
> > methods
> > > > >>>>>>>> (get,
> > > > >>>>>>>>>>>> etc.)
> > > > >>>>>>>>>>>>> to either read directly from the database or from the
> > ongoing
> > > > >>>>>>>>>>>> transaction.
> > > > >>>>>>>>>>>>> But IQv1 might still be difficult.
> > > > >>>>>>>>>>>>> - If IsolationLevel becomes a query constraint, then all
> > > > other
> > > > >>>>>>>>>>>> StateStores
> > > > >>>>>>>>>>>>> will need to respect it, including the in-memory stores.
> > This
> > > > >>>>>>>> would
> > > > >>>>>>>>>>>> require
> > > > >>>>>>>>>>>>> us to adapt in-memory stores to stage their writes so
> > they
> > > > can
> > > > >>>> be
> > > > >>>>>>>>>>>> isolated
> > > > >>>>>>>>>>>>> from READ_COMMITTTED queries. It would also become an
> > > > important
> > > > >>>>>>>>>>>>> consideration for third-party stores on upgrade, as
> > without
> > > > >>>>>>>> changes,
> > > > >>>>>>>>>>>> they
> > > > >>>>>>>>>>>>> would not support READ_COMMITTED queries correctly.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Ultimately, I may need some help making the necessary
> > change
> > > > to
> > > > >>>>>>>> IQv1
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>> support this, but I don't think it's fundamentally
> > > > impossible,
> > > > >>>>>>>> if we
> > > > >>>>>>>>>>>> want
> > > > >>>>>>>>>>>>> to pursue this route.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3b.
> > > > >>>>>>>>>>>>> The main reason I chose to keep ALOS un-transactional
> > was to
> > > > >>>>>>>> minimize
> > > > >>>>>>>>>>>>> behavioural change for most users (I believe most Streams
> > > > users
> > > > >>>>>>>> use
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>> default configuration, which is ALOS). That said, it's
> > clear
> > > > >>>>>>>> that if
> > > > >>>>>>>>>>>> ALOS
> > > > >>>>>>>>>>>>> also used transactional stores, the only change in
> > behaviour
> > > > >>>>>>>> would be
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>> it would become *more correct*, which could be
> > considered a
> > > > >> "bug
> > > > >>>>>>>> fix"
> > > > >>>>>>>>>> by
> > > > >>>>>>>>>>>>> users, rather than a change they need to handle.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I believe that performance using transactions (aka.
> > RocksDB
> > > > >>>>>>>>>>>> WriteBatches)
> > > > >>>>>>>>>>>>> should actually be *better* than the un-batched
> > write-path
> > > > that
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>> currently used[1]. The only "performance" consideration
> > will
> > > > be
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>> increased memory usage that transactions require. Given
> > the
> > > > >>>>>>>>>> mitigations
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>> this memory that we have in place, I would expect that
> > this
> > > > is
> > > > >>>>>>>> not a
> > > > >>>>>>>>>>>>> problem for most users.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> If we're happy to do so, we can make ALOS also use
> > > > >> transactions.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Link 1:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>
> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <
> > > > >>>> cado...@apache.org
> > > > >>>>>>>>>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi Nick,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks for the updates and sorry for the delay on my
> > side!
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 1.
> > > > >>>>>>>>>>>>>> Making the default implementation for flush() a no-op
> > sounds
> > > > >>>>>>>> good to
> > > > >>>>>>>>>>>> me.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>>> I think what was bugging me here is that a third-party
> > state
> > > > >>>>>>>> store
> > > > >>>>>>>>>>>> needs
> > > > >>>>>>>>>>>>>> to implement the state store interface. That means they
> > need
> > > > >> to
> > > > >>>>>>>>>>>>>> implement a wrapper around the actual state store as we
> > do
> > > > for
> > > > >>>>>>>>>> RocksDB
> > > > >>>>>>>>>>>>>> with RocksDBStore. So, a third-party state store can
> > always
> > > > >>>>>>>> estimate
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> uncommitted bytes, if it wants, because the wrapper can
> > > > record
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>> added
> > > > >>>>>>>>>>>>>> bytes.
> > > > >>>>>>>>>>>>>> One case I can think of where returning -1 makes sense
> > is
> > > > when
> > > > >>>>>>>>>> Streams
> > > > >>>>>>>>>>>>>> does not need to estimate the size of the write batch
> > and
> > > > >>>>>>>> trigger
> > > > >>>>>>>>>>>>>> extraordinary commits, because the third-party state
> > store
> > > > >>>>>>>> takes care
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>> memory. But in that case the method could also just
> > return
> > > > 0.
> > > > >>>>>>>> Even
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>>> case would be better solved with a method that returns
> > > > whether
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>> state
> > > > >>>>>>>>>>>>>> store manages itself the memory used for uncommitted
> > bytes
> > > > or
> > > > >>>>>>>> not.
> > > > >>>>>>>>>>>>>> Said that, I am fine with keeping the -1 return value,
> > I was
> > > > >>>>>>>> just
> > > > >>>>>>>>>>>>>> wondering when and if it will be used.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Regarding returning 0 for transactional state stores
> > when
> > > > the
> > > > >>>>>>>> batch
> > > > >>>>>>>>>> is
> > > > >>>>>>>>>>>>>> empty, I was just wondering because you explicitly
> > stated
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> "or {@code 0} if this StateStore does not support
> > > > >>>> transactions."
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> So it seemed to me returning 0 could only happen for
> > > > >>>>>>>>>> non-transactional
> > > > >>>>>>>>>>>>>> state stores.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> a) What do you think if we move the isolation level to
> > IQ
> > > > (v1
> > > > >>>>>>>> and
> > > > >>>>>>>>>> v2)?
> > > > >>>>>>>>>>>>>> In the end this is the only component that really needs
> > to
> > > > >>>>>>>> specify
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>> isolation level. It is similar to the Kafka consumer
> > that
> > > > can
> > > > >>>>>>>> choose
> > > > >>>>>>>>>>>>>> with what isolation level to read the input topic.
> > > > >>>>>>>>>>>>>> For IQv1 the isolation level should go into
> > > > >>>>>>>> StoreQueryParameters. For
> > > > >>>>>>>>>>>>>> IQv2, I would add it to the Query interface.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> b) Point a) raises the question what should happen
> > during
> > > > >>>>>>>>>> at-least-once
> > > > >>>>>>>>>>>>>> processing when the state store does not use
> > transactions?
> > > > >> John
> > > > >>>>>>>> in
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>> past proposed to also use transactions on state stores
> > for
> > > > >>>>>>>>>>>>>> at-least-once. I like that idea, because it avoids
> > > > aggregating
> > > > >>>>>>>> the
> > > > >>>>>>>>>> same
> > > > >>>>>>>>>>>>>> records over and over again in the case of a failure. We
> > > > had a
> > > > >>>>>>>> case
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>>> the past where a Streams applications in at-least-once
> > mode
> > > > >> was
> > > > >>>>>>>>>> failing
> > > > >>>>>>>>>>>>>> continuously for some reasons I do not remember before
> > > > >>>>>>>> committing the
> > > > >>>>>>>>>>>>>> offsets. After each failover, the app aggregated again
> > and
> > > > >>>>>>>> again the
> > > > >>>>>>>>>>>>>> same records. Of course the aggregate increased to very
> > > > wrong
> > > > >>>>>>>> values
> > > > >>>>>>>>>>>>>> just because of the failover. With transactions on the
> > state
> > > > >>>>>>>> stores
> > > > >>>>>>>>>> we
> > > > >>>>>>>>>>>>>> could have avoided this. The app would have output the
> > same
> > > > >>>>>>>> aggregate
> > > > >>>>>>>>>>>>>> multiple times (i.e., after each failover) but at least
> > the
> > > > >>>>>>>> value of
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> aggregate would not depend on the number of failovers.
> > > > >>>>>>>> Outputting the
> > > > >>>>>>>>>>>>>> same aggregate multiple times would be incorrect under
> > > > >>>>>>>> exactly-once
> > > > >>>>>>>>>> but
> > > > >>>>>>>>>>>>>> it is OK for at-least-once.
> > > > >>>>>>>>>>>>>> If it makes sense to add a config to turn on and off
> > > > >>>>>>>> transactions on
> > > > >>>>>>>>>>>>>> state stores under at-least-once or just use
> > transactions in
> > > > >>>>>>>> any case
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>> a question we should also discuss in this KIP. It
> > depends a
> > > > >> bit
> > > > >>>>>>>> on
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>> performance trade-off. Maybe to be safe, I would add a
> > > > config.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 4.
> > > > >>>>>>>>>>>>>> Your points are all valid. I tend to say to keep the
> > metrics
> > > > >>>>>>>> around
> > > > >>>>>>>>>>>>>> flush() until we remove flush() completely from the
> > > > interface.
> > > > >>>>>>>> Calls
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>>> flush() might still exist since existing processors
> > might
> > > > >> still
> > > > >>>>>>>> call
> > > > >>>>>>>>>>>>>> flush() explicitly as you mentioned in 1). For sure, we
> > need
> > > > >> to
> > > > >>>>>>>>>>>> document
> > > > >>>>>>>>>>>>>> how the metrics change due to the transactions in the
> > > > upgrade
> > > > >>>>>>>> notes.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 5.
> > > > >>>>>>>>>>>>>> I see. Then you should describe how the .position files
> > are
> > > > >>>>>>>> handled
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>>> a dedicated section of the KIP or incorporate the
> > > > description
> > > > >>>>>>>> in the
> > > > >>>>>>>>>>>>>> "Atomic Checkpointing" section instead of only
> > mentioning it
> > > > >> in
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>> "Compatibility, Deprecation, and Migration Plan".
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 6.
> > > > >>>>>>>>>>>>>> Describing upgrading and downgrading in the KIP is a
> > good
> > > > >> idea.
> > > > >>>>>>>>>>>>>> Regarding downgrading, I think you could also catch the
> > > > >>>>>>>> exception and
> > > > >>>>>>>>>>>> do
> > > > >>>>>>>>>>>>>> what is needed to downgrade, e.g., drop the column
> > family.
> > > > See
> > > > >>>>>>>> here
> > > > >>>>>>>>>> for
> > > > >>>>>>>>>>>>>> an example:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> It is a bit brittle, but it works.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>> Bruno
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On 8/24/23 12:18 PM, Nick Telford wrote:
> > > > >>>>>>>>>>>>>>> Hi Bruno,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for taking the time to review the KIP. I'm back
> > from
> > > > >>>>>>>> leave
> > > > >>>>>>>>>> now
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> intend to move this forwards as quickly as I can.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Addressing your points:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 1.
> > > > >>>>>>>>>>>>>>> Because flush() is part of the StateStore API, it's
> > exposed
> > > > >> to
> > > > >>>>>>>>>> custom
> > > > >>>>>>>>>>>>>>> Processors, which might be making calls to flush().
> > This
> > > > was
> > > > >>>>>>>>>> actually
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> case in a few integration tests.
> > > > >>>>>>>>>>>>>>> To maintain as much compatibility as possible, I'd
> > prefer
> > > > not
> > > > >>>>>>>> to
> > > > >>>>>>>>>> make
> > > > >>>>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>> an UnsupportedOperationException, as it will cause
> > > > previously
> > > > >>>>>>>>>> working
> > > > >>>>>>>>>>>>>>> Processors to start throwing exceptions at runtime.
> > > > >>>>>>>>>>>>>>> I agree that it doesn't make sense for it to proxy
> > > > commit(),
> > > > >>>>>>>> though,
> > > > >>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>> that would cause it to violate the "StateStores commit
> > only
> > > > >>>>>>>> when the
> > > > >>>>>>>>>>>> Task
> > > > >>>>>>>>>>>>>>> commits" rule.
> > > > >>>>>>>>>>>>>>> Instead, I think we should make this a no-op. That way,
> > > > >>>>>>>> existing
> > > > >>>>>>>>>> user
> > > > >>>>>>>>>>>>>>> Processors will continue to work as-before, without
> > > > violation
> > > > >>>>>>>> of
> > > > >>>>>>>>>> store
> > > > >>>>>>>>>>>>>>> consistency that would be caused by premature
> > flush/commit
> > > > of
> > > > >>>>>>>>>>>> StateStore
> > > > >>>>>>>>>>>>>>> data to disk.
> > > > >>>>>>>>>>>>>>> What do you think?
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>>>> As stated in the JavaDoc, when a StateStore
> > implementation
> > > > is
> > > > >>>>>>>>>>>>>>> transactional, but is unable to estimate the
> > uncommitted
> > > > >>>> memory
> > > > >>>>>>>>>> usage,
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> method will return -1.
> > > > >>>>>>>>>>>>>>> The intention here is to permit third-party
> > implementations
> > > > >>>>>>>> that may
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>> able to estimate memory usage.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Yes, it will be 0 when nothing has been written to the
> > > > store
> > > > >>>>>>>> yet. I
> > > > >>>>>>>>>>>>>> thought
> > > > >>>>>>>>>>>>>>> that was implied by "This method will return an
> > > > approximation
> > > > >>>>>>>> of the
> > > > >>>>>>>>>>>>>> memory
> > > > >>>>>>>>>>>>>>> would be freed by the next call to {@link
> > #commit(Map)}"
> > > > and
> > > > >>>>>>>>>> "@return
> > > > >>>>>>>>>>>> The
> > > > >>>>>>>>>>>>>>> approximate size of all records awaiting {@link
> > > > >>>> #commit(Map)}",
> > > > >>>>>>>>>>>> however,
> > > > >>>>>>>>>>>>>> I
> > > > >>>>>>>>>>>>>>> can add it explicitly to the JavaDoc if you think this
> > is
> > > > >>>>>>>> unclear?
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>> I realise this is probably the most contentious point
> > in my
> > > > >>>>>>>> design,
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>> I'm
> > > > >>>>>>>>>>>>>>> open to changing it if I'm unable to convince you of
> > the
> > > > >>>>>>>> benefits.
> > > > >>>>>>>>>>>>>>> Nevertheless, here's my argument:
> > > > >>>>>>>>>>>>>>> The Interactive Query (IQ) API(s) are directly provided
> > > > >>>>>>>> StateStores
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>>>> query, and it may be important for users to
> > > > programmatically
> > > > >>>>>>>> know
> > > > >>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>> mode the StateStore is operating under. If we simply
> > > > provide
> > > > >>>> an
> > > > >>>>>>>>>>>>>>> "eosEnabled" boolean (as used throughout the internal
> > > > streams
> > > > >>>>>>>>>>>> engine), or
> > > > >>>>>>>>>>>>>>> similar, then users will need to understand the
> > operation
> > > > and
> > > > >>>>>>>>>>>>>> consequences
> > > > >>>>>>>>>>>>>>> of each available processing mode and how it pertains
> > to
> > > > >> their
> > > > >>>>>>>>>>>>>> StateStore.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Interactive Query users aren't the only people that
> > care
> > > > >> about
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>> processing.mode/IsolationLevel of a StateStore:
> > > > implementers
> > > > >>>> of
> > > > >>>>>>>>>> custom
> > > > >>>>>>>>>>>>>>> StateStores also need to understand the behaviour
> > expected
> > > > of
> > > > >>>>>>>> their
> > > > >>>>>>>>>>>>>>> implementation. KIP-892 introduces some assumptions
> > into
> > > > the
> > > > >>>>>>>> Streams
> > > > >>>>>>>>>>>>>> Engine
> > > > >>>>>>>>>>>>>>> about how StateStores operate under each processing
> > mode,
> > > > and
> > > > >>>>>>>> it's
> > > > >>>>>>>>>>>>>>> important that custom implementations adhere to those
> > > > >>>>>>>> assumptions in
> > > > >>>>>>>>>>>>>> order
> > > > >>>>>>>>>>>>>>> to maintain the consistency guarantees.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> IsolationLevels provide a high-level contract on the
> > > > >> behaviour
> > > > >>>>>>>> of
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> StateStore: a user knows that under READ_COMMITTED,
> > they
> > > > will
> > > > >>>>>>>> see
> > > > >>>>>>>>>>>> writes
> > > > >>>>>>>>>>>>>>> only after the Task has committed, and under
> > > > READ_UNCOMMITTED
> > > > >>>>>>>> they
> > > > >>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>> see
> > > > >>>>>>>>>>>>>>> writes immediately. No understanding of the details of
> > each
> > > > >>>>>>>>>>>>>> processing.mode
> > > > >>>>>>>>>>>>>>> is required, either for IQ users or StateStore
> > > > implementers.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> An argument can be made that these contractual
> > guarantees
> > > > can
> > > > >>>>>>>> simply
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>> documented for the processing.mode (i.e. that
> > exactly-once
> > > > >> and
> > > > >>>>>>>>>>>>>>> exactly-once-v2 behave like READ_COMMITTED and
> > > > at-least-once
> > > > >>>>>>>> behaves
> > > > >>>>>>>>>>>> like
> > > > >>>>>>>>>>>>>>> READ_UNCOMMITTED), but there are several small issues
> > with
> > > > >>>>>>>> this I'd
> > > > >>>>>>>>>>>>>> prefer
> > > > >>>>>>>>>>>>>>> to avoid:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>         - Where would we document these contracts, in
> > a way
> > > > >> that
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>> difficult
> > > > >>>>>>>>>>>>>>>         for users/implementers to miss/ignore?
> > > > >>>>>>>>>>>>>>>         - It's not clear to users that the processing
> > mode
> > > > is
> > > > >>>>>>>>>>>> communicating
> > > > >>>>>>>>>>>>>>>         an expectation of read isolation, unless they
> > read
> > > > the
> > > > >>>>>>>>>>>>>> documentation. Users
> > > > >>>>>>>>>>>>>>>         rarely consult documentation unless they feel
> > they
> > > > >> need
> > > > >>>>>>>> to, so
> > > > >>>>>>>>>>>> it's
> > > > >>>>>>>>>>>>>> likely
> > > > >>>>>>>>>>>>>>>         this detail would get missed by many users.
> > > > >>>>>>>>>>>>>>>         - It tightly couples processing modes to read
> > > > >> isolation.
> > > > >>>>>>>> Adding
> > > > >>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>>         processing modes, or changing the read
> > isolation of
> > > > >>>>>>>> existing
> > > > >>>>>>>>>>>>>> processing
> > > > >>>>>>>>>>>>>>>         modes would be difficult/impossible.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Ultimately, the cost of introducing IsolationLevels is
> > > > just a
> > > > >>>>>>>> single
> > > > >>>>>>>>>>>>>>> method, since we re-use the existing IsolationLevel
> > enum
> > > > from
> > > > >>>>>>>> Kafka.
> > > > >>>>>>>>>>>> This
> > > > >>>>>>>>>>>>>>> gives us a clear place to document the contractual
> > > > guarantees
> > > > >>>>>>>>>> expected
> > > > >>>>>>>>>>>>>>> of/provided by StateStores, that is accessible both by
> > the
> > > > >>>>>>>>>> StateStore
> > > > >>>>>>>>>>>>>>> itself, and by IQ users.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> (Writing this I've just realised that the StateStore
> > and IQ
> > > > >>>>>>>> APIs
> > > > >>>>>>>>>>>> actually
> > > > >>>>>>>>>>>>>>> don't provide access to StateStoreContext that IQ users
> > > > would
> > > > >>>>>>>> have
> > > > >>>>>>>>>>>> direct
> > > > >>>>>>>>>>>>>>> access to... Perhaps StateStore should expose
> > > > >> isolationLevel()
> > > > >>>>>>>>>> itself
> > > > >>>>>>>>>>>>>> too?)
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 4.
> > > > >>>>>>>>>>>>>>> Yeah, I'm not comfortable renaming the metrics in-place
> > > > >>>>>>>> either, as
> > > > >>>>>>>>>>>> it's a
> > > > >>>>>>>>>>>>>>> backwards incompatible change. My concern is that, if
> > we
> > > > >> leave
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>> existing
> > > > >>>>>>>>>>>>>>> "flush" metrics in place, they will be confusing to
> > users.
> > > > >>>>>>>> Right
> > > > >>>>>>>>>> now,
> > > > >>>>>>>>>>>>>>> "flush" metrics record explicit flushes to disk, but
> > under
> > > > >>>>>>>> KIP-892,
> > > > >>>>>>>>>>>> even
> > > > >>>>>>>>>>>>>> a
> > > > >>>>>>>>>>>>>>> commit() will not explicitly flush data to disk -
> > RocksDB
> > > > >> will
> > > > >>>>>>>>>> decide
> > > > >>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>> when to flush memtables to disk itself.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> If we keep the existing "flush" metrics, we'd have two
> > > > >>>> options,
> > > > >>>>>>>>>> which
> > > > >>>>>>>>>>>>>> both
> > > > >>>>>>>>>>>>>>> seem pretty bad to me:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>         1. Have them record calls to commit(), which
> > would
> > > > be
> > > > >>>>>>>>>>>> misleading, as
> > > > >>>>>>>>>>>>>>>         data is no longer explicitly "flushed" to disk
> > by
> > > > this
> > > > >>>>>>>> call.
> > > > >>>>>>>>>>>>>>>         2. Have them record nothing at all, which is
> > > > >> equivalent
> > > > >>>> to
> > > > >>>>>>>>>>>> removing
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>         metrics, except that users will see the metric
> > > > still
> > > > >>>>>>>> exists and
> > > > >>>>>>>>>>>> so
> > > > >>>>>>>>>>>>>> assume
> > > > >>>>>>>>>>>>>>>         that the metric is correct, and that there's a
> > > > problem
> > > > >>>>>>>> with
> > > > >>>>>>>>>> their
> > > > >>>>>>>>>>>>>> system
> > > > >>>>>>>>>>>>>>>         when there isn't.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I agree that removing them is also a bad solution, and
> > I'd
> > > > >>>>>>>> like some
> > > > >>>>>>>>>>>>>>> guidance on the best path forward here.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 5.
> > > > >>>>>>>>>>>>>>> Position files are updated on every write to a
> > StateStore.
> > > > >>>>>>>> Since our
> > > > >>>>>>>>>>>>>> writes
> > > > >>>>>>>>>>>>>>> are now buffered until commit(), we can't update the
> > > > Position
> > > > >>>>>>>> file
> > > > >>>>>>>>>>>> until
> > > > >>>>>>>>>>>>>>> commit() has been called, otherwise it would be
> > > > inconsistent
> > > > >>>>>>>> with
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>> data
> > > > >>>>>>>>>>>>>>> in the event of a rollback. Consequently, we need to
> > manage
> > > > >>>>>>>> these
> > > > >>>>>>>>>>>> offsets
> > > > >>>>>>>>>>>>>>> the same way we manage the checkpoint offsets, and
> > ensure
> > > > >>>>>>>> they're
> > > > >>>>>>>>>> only
> > > > >>>>>>>>>>>>>>> written on commit().
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 6.
> > > > >>>>>>>>>>>>>>> Agreed, although I'm not exactly sure yet what tests to
> > > > >> write.
> > > > >>>>>>>> How
> > > > >>>>>>>>>>>>>> explicit
> > > > >>>>>>>>>>>>>>> do we need to be here in the KIP?
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> As for upgrade/downgrade: upgrade is designed to be
> > > > seamless,
> > > > >>>>>>>> and we
> > > > >>>>>>>>>>>>>> should
> > > > >>>>>>>>>>>>>>> definitely add some tests around that. Downgrade, it
> > > > >>>>>>>> transpires,
> > > > >>>>>>>>>> isn't
> > > > >>>>>>>>>>>>>>> currently possible, as the extra column family for
> > offset
> > > > >>>>>>>> storage is
> > > > >>>>>>>>>>>>>>> incompatible with the pre-KIP-892 implementation: when
> > you
> > > > >>>>>>>> open a
> > > > >>>>>>>>>>>> RocksDB
> > > > >>>>>>>>>>>>>>> database, you must open all available column families
> > or
> > > > >>>>>>>> receive an
> > > > >>>>>>>>>>>>>> error.
> > > > >>>>>>>>>>>>>>> What currently happens on downgrade is that it
> > attempts to
> > > > >>>>>>>> open the
> > > > >>>>>>>>>>>>>> store,
> > > > >>>>>>>>>>>>>>> throws an error about the offsets column family not
> > being
> > > > >>>>>>>> opened,
> > > > >>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>> triggers a wipe and rebuild of the Task. Given that
> > > > >> downgrades
> > > > >>>>>>>>>> should
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>> uncommon, I think this is acceptable behaviour, as the
> > > > >>>>>>>> end-state is
> > > > >>>>>>>>>>>>>>> consistent, even if it results in an undesirable state
> > > > >>>> restore.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Should I document the upgrade/downgrade behaviour
> > > > explicitly
> > > > >>>>>>>> in the
> > > > >>>>>>>>>>>> KIP?
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <
> > > > >>>>>>>> cado...@apache.org>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Nick!
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks for the updates!
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 1.
> > > > >>>>>>>>>>>>>>>> Why does StateStore#flush() default to
> > > > >>>>>>>>>>>>>>>> StateStore#commit(Collections.emptyMap())?
> > > > >>>>>>>>>>>>>>>> Since calls to flush() will not exist anymore after
> > this
> > > > KIP
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>>>>> released, I would rather throw an unsupported
> > operation
> > > > >>>>>>>> exception
> > > > >>>>>>>>>> by
> > > > >>>>>>>>>>>>>>>> default.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>>>>> When would a state store return -1 from
> > > > >>>>>>>>>>>>>>>> StateStore#approximateNumUncommittedBytes() while
> > being
> > > > >>>>>>>>>>>> transactional?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Wouldn't StateStore#approximateNumUncommittedBytes()
> > also
> > > > >>>>>>>> return 0
> > > > >>>>>>>>>> if
> > > > >>>>>>>>>>>>>>>> the state store is transactional but nothing has been
> > > > >> written
> > > > >>>>>>>> to
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> state store yet?
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>>> Sorry for bringing this up again. Does this KIP really
> > > > need
> > > > >>>> to
> > > > >>>>>>>>>>>> introduce
> > > > >>>>>>>>>>>>>>>> StateStoreContext#isolationLevel()? StateStoreContext
> > has
> > > > >>>>>>>> already
> > > > >>>>>>>>>>>>>>>> appConfigs() which basically exposes the same
> > information,
> > > > >>>>>>>> i.e., if
> > > > >>>>>>>>>>>> EOS
> > > > >>>>>>>>>>>>>>>> is enabled or not.
> > > > >>>>>>>>>>>>>>>> In one of your previous e-mails you wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> "My idea was to try to keep the StateStore interface
> > as
> > > > >>>>>>>> loosely
> > > > >>>>>>>>>>>> coupled
> > > > >>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > implementers
> > > > >>>> more
> > > > >>>>>>>>>>>> freedom,
> > > > >>>>>>>>>>>>>>>> and reduce the amount of internal knowledge required."
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> While I understand the intent, I doubt that it
> > decreases
> > > > the
> > > > >>>>>>>>>>>> coupling of
> > > > >>>>>>>>>>>>>>>> a StateStore interface and the Streams engine.
> > > > >> READ_COMMITTED
> > > > >>>>>>>> only
> > > > >>>>>>>>>>>>>>>> applies to IQ but not to reads by processors. Thus,
> > > > >>>>>>>> implementers
> > > > >>>>>>>>>>>> need to
> > > > >>>>>>>>>>>>>>>> understand how Streams accesses the state stores.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I would like to hear what others think about this.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 4.
> > > > >>>>>>>>>>>>>>>> Great exposing new metrics for transactional state
> > stores!
> > > > >>>>>>>>>> However, I
> > > > >>>>>>>>>>>>>>>> would prefer to add new metrics and deprecate (in the
> > > > docs)
> > > > >>>>>>>> the old
> > > > >>>>>>>>>>>>>>>> ones. You can find examples of deprecated metrics
> > here:
> > > > >>>>>>>>>>>>>>>>
> > > > https://kafka.apache.org/documentation/#selector_monitoring
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 5.
> > > > >>>>>>>>>>>>>>>> Why does the KIP mention position files? I do not
> > think
> > > > they
> > > > >>>>>>>> are
> > > > >>>>>>>>>>>> related
> > > > >>>>>>>>>>>>>>>> to transactions or flushes.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> 6.
> > > > >>>>>>>>>>>>>>>> I think we will also need to adapt/add integration
> > tests
> > > > >>>>>>>> besides
> > > > >>>>>>>>>> unit
> > > > >>>>>>>>>>>>>>>> tests. Additionally, we probably need integration or
> > > > system
> > > > >>>>>>>> tests
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>> verify that upgrades and downgrades between
> > transactional
> > > > >> and
> > > > >>>>>>>>>>>>>>>> non-transactional state stores work as expected.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>>> Bruno
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On 7/21/23 10:34 PM, Nick Telford wrote:
> > > > >>>>>>>>>>>>>>>>> One more thing: I noted John's suggestion in the KIP,
> > > > under
> > > > >>>>>>>>>>>> "Rejected
> > > > >>>>>>>>>>>>>>>>> Alternatives". I still think it's an idea worth
> > pursuing,
> > > > >>>>>>>> but I
> > > > >>>>>>>>>>>> believe
> > > > >>>>>>>>>>>>>>>>> that it's out of the scope of this KIP, because it
> > > > solves a
> > > > >>>>>>>>>>>> different
> > > > >>>>>>>>>>>>>> set
> > > > >>>>>>>>>>>>>>>>> of problems to this KIP, and the scope of this one
> > has
> > > > >>>>>>>> already
> > > > >>>>>>>>>> grown
> > > > >>>>>>>>>>>>>>>> quite
> > > > >>>>>>>>>>>>>>>>> large!
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On Fri, 21 Jul 2023 at 21:33, Nick Telford <
> > > > >>>>>>>>>> nick.telf...@gmail.com>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Hi everyone,
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I've updated the KIP (
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > >>>>>>>>>>>>>>>> )
> > > > >>>>>>>>>>>>>>>>>> with the latest changes; mostly bringing back
> > "Atomic
> > > > >>>>>>>>>>>> Checkpointing"
> > > > >>>>>>>>>>>>>>>> (for
> > > > >>>>>>>>>>>>>>>>>> what feels like the 10th time!). I think the one
> > thing
> > > > >>>>>>>> missing is
> > > > >>>>>>>>>>>> some
> > > > >>>>>>>>>>>>>>>>>> changes to metrics (notably the store "flush"
> > metrics
> > > > will
> > > > >>>>>>>> need
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>> renamed to "commit").
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> The reason I brought back Atomic Checkpointing was
> > to
> > > > >>>>>>>> decouple
> > > > >>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>> flush
> > > > >>>>>>>>>>>>>>>>>> from store commit. This is important, because with
> > > > >>>>>>>> Transactional
> > > > >>>>>>>>>>>>>>>>>> StateStores, we now need to call "flush" on *every*
> > Task
> > > > >>>>>>>> commit,
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>> just when the StateStore is closing, otherwise our
> > > > >>>>>>>> transaction
> > > > >>>>>>>>>>>> buffer
> > > > >>>>>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>>>>> never be written and persisted, instead growing
> > > > unbounded!
> > > > >>>> I
> > > > >>>>>>>>>>>>>>>> experimented
> > > > >>>>>>>>>>>>>>>>>> with some simple solutions, like forcing a store
> > flush
> > > > >>>>>>>> whenever
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> transaction buffer was likely to exceed its
> > configured
> > > > >>>>>>>> size, but
> > > > >>>>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>> was
> > > > >>>>>>>>>>>>>>>>>> brittle: it prevented the transaction buffer from
> > being
> > > > >>>>>>>>>> configured
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>> unbounded, and it still would have required explicit
> > > > >>>>>>>> flushes of
> > > > >>>>>>>>>>>>>> RocksDB,
> > > > >>>>>>>>>>>>>>>>>> yielding sub-optimal performance and memory
> > utilization.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I deemed Atomic Checkpointing to be the "right" way
> > to
> > > > >>>>>>>> resolve
> > > > >>>>>>>>>> this
> > > > >>>>>>>>>>>>>>>>>> problem. By ensuring that the changelog offsets that
> > > > >>>>>>>> correspond
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> most
> > > > >>>>>>>>>>>>>>>>>> recently written records are always atomically
> > written
> > > > to
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>> StateStore
> > > > >>>>>>>>>>>>>>>>>> (by writing them to the same transaction buffer),
> > we can
> > > > >>>>>>>> avoid
> > > > >>>>>>>>>>>>>> forcibly
> > > > >>>>>>>>>>>>>>>>>> flushing the RocksDB memtables to disk, letting
> > RocksDB
> > > > >>>>>>>> flush
> > > > >>>>>>>>>> them
> > > > >>>>>>>>>>>>>> only
> > > > >>>>>>>>>>>>>>>>>> when necessary, without losing any of our
> > consistency
> > > > >>>>>>>> guarantees.
> > > > >>>>>>>>>>>> See
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> updated KIP for more info.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I have fully implemented these changes, although I'm
> > > > still
> > > > >>>>>>>> not
> > > > >>>>>>>>>>>>>> entirely
> > > > >>>>>>>>>>>>>>>>>> happy with the implementation for segmented
> > StateStores,
> > > > >> so
> > > > >>>>>>>> I
> > > > >>>>>>>>>> plan
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>> refactor that. Despite that, all tests pass. If
> > you'd
> > > > like
> > > > >>>>>>>> to try
> > > > >>>>>>>>>>>> out
> > > > >>>>>>>>>>>>>> or
> > > > >>>>>>>>>>>>>>>>>> review this highly experimental and incomplete
> > branch,
> > > > >> it's
> > > > >>>>>>>>>>>> available
> > > > >>>>>>>>>>>>>>>> here:
> > > > >>>>>>>>>>>>>>>>>>
> > https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0
> > > > .
> > > > >>>>>>>> Note:
> > > > >>>>>>>>>>>> it's
> > > > >>>>>>>>>>>>>>>> built
> > > > >>>>>>>>>>>>>>>>>> against Kafka 3.5.0 so that I had a stable base to
> > build
> > > > >>>>>>>> and test
> > > > >>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>> on,
> > > > >>>>>>>>>>>>>>>>>> and to enable easy apples-to-apples comparisons in a
> > > > live
> > > > >>>>>>>>>>>>>> environment. I
> > > > >>>>>>>>>>>>>>>>>> plan to rebase it against trunk once it's nearer
> > > > >> completion
> > > > >>>>>>>> and
> > > > >>>>>>>>>> has
> > > > >>>>>>>>>>>>>> been
> > > > >>>>>>>>>>>>>>>>>> proven on our main application.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I would really appreciate help in reviewing and
> > testing:
> > > > >>>>>>>>>>>>>>>>>> - Segmented (Versioned, Session and Window) stores
> > > > >>>>>>>>>>>>>>>>>> - Global stores
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> As I do not currently use either of these, so my
> > primary
> > > > >>>>>>>> test
> > > > >>>>>>>>>>>>>>>> environment
> > > > >>>>>>>>>>>>>>>>>> doesn't test these areas.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> I'm going on Parental Leave starting next week for
> > a few
> > > > >>>>>>>> weeks,
> > > > >>>>>>>>>> so
> > > > >>>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>>>>> not have time to move this forward until late
> > August.
> > > > That
> > > > >>>>>>>> said,
> > > > >>>>>>>>>>>> your
> > > > >>>>>>>>>>>>>>>>>> feedback is welcome and appreciated, I just won't be
> > > > able
> > > > >>>> to
> > > > >>>>>>>>>>>> respond
> > > > >>>>>>>>>>>>>> as
> > > > >>>>>>>>>>>>>>>>>> quickly as usual.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 16:23, Nick Telford <
> > > > >>>>>>>>>> nick.telf...@gmail.com>
> > > > >>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Hi Bruno
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Yes, that's correct, although the impact on IQ is
> > not
> > > > >>>>>>>> something
> > > > >>>>>>>>>> I
> > > > >>>>>>>>>>>> had
> > > > >>>>>>>>>>>>>>>>>>> considered.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> What about atomically updating the state store
> > from the
> > > > >>>>>>>>>>>> transaction
> > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > > > checkpoint
> > > > >>>>>>>> (thus,
> > > > >>>>>>>>>>>>>>>> flushing
> > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data
> > and/or
> > > > >>>>>>>> number of
> > > > >>>>>>>>>>>>>> commit
> > > > >>>>>>>>>>>>>>>>>>>> intervals?
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I'm not quite sure I follow. Are you suggesting
> > that we
> > > > >>>>>>>> add an
> > > > >>>>>>>>>>>>>>>> additional
> > > > >>>>>>>>>>>>>>>>>>> config for the max number of commit intervals
> > between
> > > > >>>>>>>>>> checkpoints?
> > > > >>>>>>>>>>>>>> That
> > > > >>>>>>>>>>>>>>>>>>> way, we would checkpoint *either* when the
> > transaction
> > > > >>>>>>>> buffers
> > > > >>>>>>>>>> are
> > > > >>>>>>>>>>>>>>>> nearly
> > > > >>>>>>>>>>>>>>>>>>> full, *OR* whenever a certain number of commit
> > > > intervals
> > > > >>>>>>>> have
> > > > >>>>>>>>>>>>>> elapsed,
> > > > >>>>>>>>>>>>>>>>>>> whichever comes first?
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> That certainly seems reasonable, although this
> > > > re-ignites
> > > > >>>>>>>> an
> > > > >>>>>>>>>>>> earlier
> > > > >>>>>>>>>>>>>>>>>>> debate about whether a config should be measured in
> > > > >>>>>>>> "number of
> > > > >>>>>>>>>>>> commit
> > > > >>>>>>>>>>>>>>>>>>> intervals", instead of just an absolute time.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> FWIW, I realised that this issue is the reason I
> > was
> > > > >>>>>>>> pursuing
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> Atomic
> > > > >>>>>>>>>>>>>>>>>>> Checkpoints, as it de-couples memtable flush from
> > > > >>>>>>>> checkpointing,
> > > > >>>>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>>>>>> enables us to just checkpoint on every commit
> > without
> > > > any
> > > > >>>>>>>>>>>> performance
> > > > >>>>>>>>>>>>>>>>>>> impact. Atomic Checkpointing is definitely the
> > "best"
> > > > >>>>>>>> solution,
> > > > >>>>>>>>>>>> but
> > > > >>>>>>>>>>>>>>>> I'm not
> > > > >>>>>>>>>>>>>>>>>>> sure if this is enough to bring it back into this
> > KIP.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I'm currently working on moving all the
> > transactional
> > > > >>>> logic
> > > > >>>>>>>>>>>> directly
> > > > >>>>>>>>>>>>>>>> into
> > > > >>>>>>>>>>>>>>>>>>> RocksDBStore itself, which does away with the
> > > > >>>>>>>>>>>>>> StateStore#newTransaction
> > > > >>>>>>>>>>>>>>>>>>> method, and reduces the number of new classes
> > > > introduced,
> > > > >>>>>>>>>>>>>> significantly
> > > > >>>>>>>>>>>>>>>>>>> reducing the complexity. If it works, and the
> > > > complexity
> > > > >>>> is
> > > > >>>>>>>>>>>>>> drastically
> > > > >>>>>>>>>>>>>>>>>>> reduced, I may try bringing back Atomic Checkpoints
> > > > into
> > > > >>>>>>>> this
> > > > >>>>>>>>>> KIP.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <
> > > > >>>>>>>> cado...@apache.org>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Hi Nick,
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Thanks for the insights! Very interesting!
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> As far as I understand, you want to atomically
> > update
> > > > >> the
> > > > >>>>>>>> state
> > > > >>>>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>>>> from the transaction buffer, flush the memtable
> > of a
> > > > >>>> state
> > > > >>>>>>>>>> store
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>> write the checkpoint not after the commit time
> > elapsed
> > > > >>>> but
> > > > >>>>>>>>>> after
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> transaction buffer reached a size that would lead
> > to
> > > > >>>>>>>> exceeding
> > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes before the
> > > > next
> > > > >>>>>>>> commit
> > > > >>>>>>>>>>>>>>>> interval
> > > > >>>>>>>>>>>>>>>>>>>> ends.
> > > > >>>>>>>>>>>>>>>>>>>> That means, the Kafka transaction would commit
> > every
> > > > >>>>>>>> commit
> > > > >>>>>>>>>>>> interval
> > > > >>>>>>>>>>>>>>>> but
> > > > >>>>>>>>>>>>>>>>>>>> the state store will only be atomically updated
> > > > roughly
> > > > >>>>>>>> every
> > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes of data.
> > Also
> > > > IQ
> > > > >>>>>>>> would
> > > > >>>>>>>>>>>> then
> > > > >>>>>>>>>>>>>>>> only
> > > > >>>>>>>>>>>>>>>>>>>> see new data roughly every
> > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > > > >>>>>>>>>>>>>>>>>>>> After a failure the state store needs to restore
> > up to
> > > > >>>>>>>>>>>>>>>>>>>> statestore.transaction.buffer.max.bytes.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Is this correct?
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> What about atomically updating the state store
> > from
> > > > the
> > > > >>>>>>>>>>>> transaction
> > > > >>>>>>>>>>>>>>>>>>>> buffer every commit interval and writing the
> > > > checkpoint
> > > > >>>>>>>> (thus,
> > > > >>>>>>>>>>>>>>>> flushing
> > > > >>>>>>>>>>>>>>>>>>>> the memtable) every configured amount of data
> > and/or
> > > > >>>>>>>> number of
> > > > >>>>>>>>>>>>>> commit
> > > > >>>>>>>>>>>>>>>>>>>> intervals? In such a way, we would have the same
> > delay
> > > > >>>> for
> > > > >>>>>>>>>>>> records
> > > > >>>>>>>>>>>>>>>>>>>> appearing in output topics and IQ because both
> > would
> > > > >>>>>>>> appear
> > > > >>>>>>>>>> when
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> Kafka transaction is committed. However, after a
> > > > failure
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>> state
> > > > >>>>>>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>>>> still needs to restore up to
> > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > > > >>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>> it might restore data that is already in the state
> > > > store
> > > > >>>>>>>>>> because
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> checkpoint lags behind the last stable offset
> > (i.e.
> > > > the
> > > > >>>>>>>> last
> > > > >>>>>>>>>>>>>> committed
> > > > >>>>>>>>>>>>>>>>>>>> offset) of the changelog topics. Restoring data
> > that
> > > > is
> > > > >>>>>>>> already
> > > > >>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> state store is idempotent, so eos should not
> > violated.
> > > > >>>>>>>>>>>>>>>>>>>> This solution needs at least one new config to
> > specify
> > > > >>>>>>>> when a
> > > > >>>>>>>>>>>>>>>> checkpoint
> > > > >>>>>>>>>>>>>>>>>>>> should be written.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> A small correction to your previous e-mail that
> > does
> > > > not
> > > > >>>>>>>> change
> > > > >>>>>>>>>>>>>>>> anything
> > > > >>>>>>>>>>>>>>>>>>>> you said: Under alos the default commit interval
> > is 30
> > > > >>>>>>>> seconds,
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>> five
> > > > >>>>>>>>>>>>>>>>>>>> seconds.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>>>>>>> Bruno
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> On 01.07.23 12:37, Nick Telford wrote:
> > > > >>>>>>>>>>>>>>>>>>>>> Hi everyone,
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> I've begun performance testing my branch on our
> > > > staging
> > > > >>>>>>>>>>>>>> environment,
> > > > >>>>>>>>>>>>>>>>>>>>> putting it through its paces in our non-trivial
> > > > >>>>>>>> application.
> > > > >>>>>>>>>> I'm
> > > > >>>>>>>>>>>>>>>>>>>> already
> > > > >>>>>>>>>>>>>>>>>>>>> observing the same increased flush rate that we
> > saw
> > > > the
> > > > >>>>>>>> last
> > > > >>>>>>>>>>>> time
> > > > >>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>>>>>>> attempted to use a version of this KIP, but this
> > > > time,
> > > > >> I
> > > > >>>>>>>>>> think I
> > > > >>>>>>>>>>>>>> know
> > > > >>>>>>>>>>>>>>>>>>>> why.
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Pre-KIP-892, StreamTask#postCommit, which is
> > called
> > > > at
> > > > >>>>>>>> the end
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> Task
> > > > >>>>>>>>>>>>>>>>>>>>> commit process, has the following behaviour:
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>           - Under ALOS: checkpoint the state
> > stores.
> > > > >> This
> > > > >>>>>>>>>> includes
> > > > >>>>>>>>>>>>>>>>>>>>>           flushing memtables in RocksDB. This is
> > > > >>>> acceptable
> > > > >>>>>>>>>>>> because the
> > > > >>>>>>>>>>>>>>>>>>>> default
> > > > >>>>>>>>>>>>>>>>>>>>>           commit.interval.ms is 5 seconds, so
> > > > forcibly
> > > > >>>>>>>> flushing
> > > > >>>>>>>>>>>>>> memtables
> > > > >>>>>>>>>>>>>>>>>>>> every 5
> > > > >>>>>>>>>>>>>>>>>>>>>           seconds is acceptable for most
> > > > applications.
> > > > >>>>>>>>>>>>>>>>>>>>>           - Under EOS: checkpointing is not done,
> > > > >> *unless*
> > > > >>>>>>>> it's
> > > > >>>>>>>>>>>> being
> > > > >>>>>>>>>>>>>>>>>>>> forced, due
> > > > >>>>>>>>>>>>>>>>>>>>>           to e.g. the Task closing or being
> > revoked.
> > > > >> This
> > > > >>>>>>>> means
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>> under
> > > > >>>>>>>>>>>>>>>>>>>> normal
> > > > >>>>>>>>>>>>>>>>>>>>>           processing conditions, the state stores
> > > > will
> > > > >> not
> > > > >>>>>>>> be
> > > > >>>>>>>>>>>>>>>> checkpointed,
> > > > >>>>>>>>>>>>>>>>>>>> and will
> > > > >>>>>>>>>>>>>>>>>>>>>           not have memtables flushed at all ,
> > unless
> > > > >>>> RocksDB
> > > > >>>>>>>>>>>> decides to
> > > > >>>>>>>>>>>>>>>>>>>> flush them on
> > > > >>>>>>>>>>>>>>>>>>>>>           its own. Checkpointing stores and
> > > > >> force-flushing
> > > > >>>>>>>> their
> > > > >>>>>>>>>>>>>> memtables
> > > > >>>>>>>>>>>>>>>>>>>> is only
> > > > >>>>>>>>>>>>>>>>>>>>>           done when a Task is being closed.
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Under EOS, KIP-892 needs to checkpoint stores on
> > at
> > > > >>>> least
> > > > >>>>>>>>>> *some*
> > > > >>>>>>>>>>>>>>>> normal
> > > > >>>>>>>>>>>>>>>>>>>>> Task commits, in order to write the RocksDB
> > > > transaction
> > > > >>>>>>>>>> buffers
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>> state stores, and to ensure the offsets are
> > synced to
> > > > >>>>>>>> disk to
> > > > >>>>>>>>>>>>>> prevent
> > > > >>>>>>>>>>>>>>>>>>>>> restores from getting out of hand. Consequently,
> > my
> > > > >>>>>>>> current
> > > > >>>>>>>>>>>>>>>>>>>> implementation
> > > > >>>>>>>>>>>>>>>>>>>>> calls maybeCheckpoint on *every* Task commit,
> > which
> > > > is
> > > > >>>>>>>> far too
> > > > >>>>>>>>>>>>>>>>>>>> frequent.
> > > > >>>>>>>>>>>>>>>>>>>>> This causes checkpoints every 10,000 records,
> > which
> > > > is
> > > > >> a
> > > > >>>>>>>>>> change
> > > > >>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>>>>>>>> flush
> > > > >>>>>>>>>>>>>>>>>>>>> behaviour, potentially causing performance
> > problems
> > > > for
> > > > >>>>>>>> some
> > > > >>>>>>>>>>>>>>>>>>>> applications.
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> I'm looking into possible solutions, and I'm
> > > > currently
> > > > >>>>>>>> leaning
> > > > >>>>>>>>>>>>>>>> towards
> > > > >>>>>>>>>>>>>>>>>>>>> using the statestore.transaction.buffer.max.bytes
> > > > >>>>>>>>>> configuration
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>>> checkpoint Tasks once we are likely to exceed it.
> > > > This
> > > > >>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>> complement the
> > > > >>>>>>>>>>>>>>>>>>>>> existing "early Task commit" functionality that
> > this
> > > > >>>>>>>>>>>> configuration
> > > > >>>>>>>>>>>>>>>>>>>>> provides, in the following way:
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>           - Currently, we use
> > > > >>>>>>>>>>>> statestore.transaction.buffer.max.bytes
> > > > >>>>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>> force an
> > > > >>>>>>>>>>>>>>>>>>>>>           early Task commit if processing more
> > > > records
> > > > >>>> would
> > > > >>>>>>>>>> cause
> > > > >>>>>>>>>>>> our
> > > > >>>>>>>>>>>>>>>> state
> > > > >>>>>>>>>>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>>>>>           transactions to exceed the memory
> > assigned
> > > > to
> > > > >>>>>>>> them.
> > > > >>>>>>>>>>>>>>>>>>>>>           - New functionality: when a Task *does*
> > > > >> commit,
> > > > >>>>>>>> we will
> > > > >>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>> checkpoint
> > > > >>>>>>>>>>>>>>>>>>>>>           the stores (and hence flush the
> > transaction
> > > > >>>>>>>> buffers)
> > > > >>>>>>>>>>>> unless
> > > > >>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>>>>>> expect to
> > > > >>>>>>>>>>>>>>>>>>>>>           cross the
> > > > >>>> statestore.transaction.buffer.max.bytes
> > > > >>>>>>>>>>>> threshold
> > > > >>>>>>>>>>>>>>>> before
> > > > >>>>>>>>>>>>>>>>>>>> the next
> > > > >>>>>>>>>>>>>>>>>>>>>           commit
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> I'm also open to suggestions.
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 14:06, Nick Telford <
> > > > >>>>>>>>>>>> nick.telf...@gmail.com
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Hi Bruno!
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>>>>>>>>> By "less predictable for users", I meant in
> > terms of
> > > > >>>>>>>>>>>> understanding
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>> performance profile under various
> > circumstances. The
> > > > >>>>>>>> more
> > > > >>>>>>>>>>>> complex
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>> solution, the more difficult it would be for
> > users
> > > > to
> > > > >>>>>>>>>>>> understand
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>> performance they see. For example, spilling
> > records
> > > > to
> > > > >>>>>>>> disk
> > > > >>>>>>>>>>>> when
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>> transaction buffer reaches a threshold would, I
> > > > >> expect,
> > > > >>>>>>>>>> reduce
> > > > >>>>>>>>>>>>>> write
> > > > >>>>>>>>>>>>>>>>>>>>>> throughput. This reduction in write throughput
> > could
> > > > >> be
> > > > >>>>>>>>>>>>>> unexpected,
> > > > >>>>>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>>>> potentially difficult to diagnose/understand for
> > > > >> users.
> > > > >>>>>>>>>>>>>>>>>>>>>> At the moment, I think the "early commit"
> > concept is
> > > > >>>>>>>>>> relatively
> > > > >>>>>>>>>>>>>>>>>>>>>> straightforward; it's easy to document, and
> > > > >>>> conceptually
> > > > >>>>>>>>>> fairly
> > > > >>>>>>>>>>>>>>>>>>>> obvious to
> > > > >>>>>>>>>>>>>>>>>>>>>> users. We could probably add a metric to make it
> > > > >> easier
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>>>>>> understand
> > > > >>>>>>>>>>>>>>>>>>>> when
> > > > >>>>>>>>>>>>>>>>>>>>>> it happens though.
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> 3. (the second one)
> > > > >>>>>>>>>>>>>>>>>>>>>> The IsolationLevel is *essentially* an indirect
> > way
> > > > of
> > > > >>>>>>>>>> telling
> > > > >>>>>>>>>>>>>>>>>>>> StateStores
> > > > >>>>>>>>>>>>>>>>>>>>>> whether they should be transactional.
> > READ_COMMITTED
> > > > >>>>>>>>>>>> essentially
> > > > >>>>>>>>>>>>>>>>>>>> requires
> > > > >>>>>>>>>>>>>>>>>>>>>> transactions, because it dictates that two
> > threads
> > > > >>>>>>>> calling
> > > > >>>>>>>>>>>>>>>>>>>>>> `newTransaction()` should not see writes from
> > the
> > > > >> other
> > > > >>>>>>>>>>>>>> transaction
> > > > >>>>>>>>>>>>>>>>>>>> until
> > > > >>>>>>>>>>>>>>>>>>>>>> they have been committed. With
> > READ_UNCOMMITTED, all
> > > > >>>>>>>> bets are
> > > > >>>>>>>>>>>> off,
> > > > >>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>>>> stores can allow threads to observe written
> > records
> > > > at
> > > > >>>>>>>> any
> > > > >>>>>>>>>>>> time,
> > > > >>>>>>>>>>>>>>>>>>>> which is
> > > > >>>>>>>>>>>>>>>>>>>>>> essentially "no transactions". That said,
> > > > StateStores
> > > > >>>>>>>> are
> > > > >>>>>>>>>> free
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>> implement
> > > > >>>>>>>>>>>>>>>>>>>>>> these guarantees however they can, which is a
> > bit
> > > > more
> > > > >>>>>>>>>> relaxed
> > > > >>>>>>>>>>>>>> than
> > > > >>>>>>>>>>>>>>>>>>>>>> dictating "you must use transactions". For
> > example,
> > > > >>>> with
> > > > >>>>>>>>>>>> RocksDB
> > > > >>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>>>>>>>>> implement these as READ_COMMITTED == WBWI-based
> > > > >>>>>>>>>> "transactions",
> > > > >>>>>>>>>>>>>>>>>>>>>> READ_UNCOMMITTED == direct writes to the
> > database.
> > > > But
> > > > >>>>>>>> with
> > > > >>>>>>>>>>>> other
> > > > >>>>>>>>>>>>>>>>>>>> storage
> > > > >>>>>>>>>>>>>>>>>>>>>> engines, it might be preferable to *always* use
> > > > >>>>>>>> transactions,
> > > > >>>>>>>>>>>> even
> > > > >>>>>>>>>>>>>>>>>>>> when
> > > > >>>>>>>>>>>>>>>>>>>>>> unnecessary; or there may be storage engines
> > that
> > > > >> don't
> > > > >>>>>>>>>> provide
> > > > >>>>>>>>>>>>>>>>>>>>>> transactions, but the isolation guarantees can
> > be
> > > > met
> > > > >>>>>>>> using a
> > > > >>>>>>>>>>>>>>>>>>>> different
> > > > >>>>>>>>>>>>>>>>>>>>>> technique.
> > > > >>>>>>>>>>>>>>>>>>>>>> My idea was to try to keep the StateStore
> > interface
> > > > as
> > > > >>>>>>>>>> loosely
> > > > >>>>>>>>>>>>>>>> coupled
> > > > >>>>>>>>>>>>>>>>>>>>>> from the Streams engine as possible, to give
> > > > >>>>>>>> implementers
> > > > >>>>>>>>>> more
> > > > >>>>>>>>>>>>>>>>>>>> freedom, and
> > > > >>>>>>>>>>>>>>>>>>>>>> reduce the amount of internal knowledge
> > required.
> > > > >>>>>>>>>>>>>>>>>>>>>> That said, I understand that "IsolationLevel"
> > might
> > > > >> not
> > > > >>>>>>>> be
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>> right
> > > > >>>>>>>>>>>>>>>>>>>>>> abstraction, and we can always make it much more
> > > > >>>>>>>> explicit if
> > > > >>>>>>>>>>>>>>>>>>>> required, e.g.
> > > > >>>>>>>>>>>>>>>>>>>>>> boolean transactional()
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> 7-8.
> > > > >>>>>>>>>>>>>>>>>>>>>> I can make these changes either later today or
> > > > >>>> tomorrow.
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Small update:
> > > > >>>>>>>>>>>>>>>>>>>>>> I've rebased my branch on trunk and fixed a
> > bunch of
> > > > >>>>>>>> issues
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>> needed
> > > > >>>>>>>>>>>>>>>>>>>>>> addressing. Currently, all the tests pass,
> > which is
> > > > >>>>>>>>>> promising,
> > > > >>>>>>>>>>>> but
> > > > >>>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>>>>>>>>> need to undergo some performance testing. I
> > haven't
> > > > >>>>>>>> (yet)
> > > > >>>>>>>>>>>> worked
> > > > >>>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>>>>>>>>> removing the `newTransaction()` stuff, but I
> > would
> > > > >>>>>>>> expect
> > > > >>>>>>>>>> that,
> > > > >>>>>>>>>>>>>>>>>>>>>> behaviourally, it should make no difference. The
> > > > >> branch
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>>> available
> > > > >>>>>>>>>>>>>>>>>>>> at
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > https://github.com/nicktelford/kafka/tree/KIP-892-c
> > > > >> if
> > > > >>>>>>>>>> anyone
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>>>>> interested in taking an early look.
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>> On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <
> > > > >>>>>>>>>>>> cado...@apache.org>
> > > > >>>>>>>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 1.
> > > > >>>>>>>>>>>>>>>>>>>>>>> Yeah, I agree with you. That was actually also
> > my
> > > > >>>>>>>> point. I
> > > > >>>>>>>>>>>>>>>> understood
> > > > >>>>>>>>>>>>>>>>>>>>>>> that John was proposing the ingestion path as
> > a way
> > > > >> to
> > > > >>>>>>>> avoid
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> early
> > > > >>>>>>>>>>>>>>>>>>>>>>> commits. Probably, I misinterpreted the intent.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>>>>>>>>>>>> I agree with John here, that actually it is
> > public
> > > > >>>>>>>> API. My
> > > > >>>>>>>>>>>>>> question
> > > > >>>>>>>>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>>>>>> how this usage pattern affects normal
> > processing.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>>>>>>>>>> My concern is that checking for the size of the
> > > > >>>>>>>> transaction
> > > > >>>>>>>>>>>>>> buffer
> > > > >>>>>>>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>>>>>>>> maybe triggering an early commit affects the
> > whole
> > > > >>>>>>>>>> processing
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > > >>>>>>>>>>>>>>>>>>>>>>> Streams. The transactionality of a state store
> > is
> > > > not
> > > > >>>>>>>>>>>> confined to
> > > > >>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>>> state store itself, but spills over and
> > changes the
> > > > >>>>>>>> behavior
> > > > >>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>> other
> > > > >>>>>>>>>>>>>>>>>>>>>>> parts of the system. I agree with you that it
> > is a
> > > > >>>>>>>> decent
> > > > >>>>>>>>>>>>>>>>>>>> compromise. I
> > > > >>>>>>>>>>>>>>>>>>>>>>> just wanted to analyse the downsides and list
> > the
> > > > >>>>>>>> options to
> > > > >>>>>>>>>>>>>>>> overcome
> > > > >>>>>>>>>>>>>>>>>>>>>>> them. I also agree with you that all options
> > seem
> > > > >>>> quite
> > > > >>>>>>>>>> heavy
> > > > >>>>>>>>>>>>>>>>>>>> compared
> > > > >>>>>>>>>>>>>>>>>>>>>>> with your KIP. I do not understand what you
> > mean
> > > > with
> > > > >>>>>>>> "less
> > > > >>>>>>>>>>>>>>>>>>>> predictable
> > > > >>>>>>>>>>>>>>>>>>>>>>> for users", though.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> I found the discussions about the alternatives
> > > > really
> > > > >>>>>>>>>>>>>> interesting.
> > > > >>>>>>>>>>>>>>>>>>>> But I
> > > > >>>>>>>>>>>>>>>>>>>>>>> also think that your plan sounds good and we
> > should
> > > > >>>>>>>> continue
> > > > >>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>>> it!
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> Some comments on your reply to my e-mail on
> > June
> > > > >> 20th:
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>>>>>>>>>> Ah, now, I understand the reasoning behind
> > putting
> > > > >>>>>>>> isolation
> > > > >>>>>>>>>>>>>> level
> > > > >>>>>>>>>>>>>>>> in
> > > > >>>>>>>>>>>>>>>>>>>>>>> the state store context. Thanks! Should that
> > also
> > > > be
> > > > >> a
> > > > >>>>>>>> way
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>>> give
> > > > >>>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>>> the state store the opportunity to decide
> > whether
> > > > to
> > > > >>>>>>>> turn on
> > > > >>>>>>>>>>>>>>>>>>>>>>> transactions or not?
> > > > >>>>>>>>>>>>>>>>>>>>>>> With my comment, I was more concerned about
> > how do
> > > > >> you
> > > > >>>>>>>> know
> > > > >>>>>>>>>>>> if a
> > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file needs to be written under EOS,
> > if
> > > > you
> > > > >>>>>>>> do not
> > > > >>>>>>>>>>>>>> have a
> > > > >>>>>>>>>>>>>>>>>>>> way
> > > > >>>>>>>>>>>>>>>>>>>>>>> to know if the state store is transactional or
> > not.
> > > > >> If
> > > > >>>>>>>> a
> > > > >>>>>>>>>> state
> > > > >>>>>>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>>>>>> transactional, the checkpoint file can be
> > written
> > > > >>>>>>>> during
> > > > >>>>>>>>>>>> normal
> > > > >>>>>>>>>>>>>>>>>>>>>>> processing under EOS. If the state store is not
> > > > >>>>>>>>>> transactional,
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>>> checkpoint file must not be written under EOS.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 7.
> > > > >>>>>>>>>>>>>>>>>>>>>>> My point was about not only considering the
> > bytes
> > > > in
> > > > >>>>>>>> memory
> > > > >>>>>>>>>> in
> > > > >>>>>>>>>>>>>>>> config
> > > > >>>>>>>>>>>>>>>>>>>>>>> statestore.uncommitted.max.bytes, but also
> > bytes
> > > > that
> > > > >>>>>>>> might
> > > > >>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>>>> spilled
> > > > >>>>>>>>>>>>>>>>>>>>>>> on disk. Basically, I was wondering whether you
> > > > >> should
> > > > >>>>>>>>>> remove
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>>> "memory" in "Maximum number of memory bytes to
> > be
> > > > >> used
> > > > >>>>>>>> to
> > > > >>>>>>>>>>>>>>>>>>>>>>> buffer uncommitted state-store records." My
> > > > thinking
> > > > >>>>>>>> was
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>>> even
> > > > >>>>>>>>>>>>>>>>>>>> if a
> > > > >>>>>>>>>>>>>>>>>>>>>>> state store spills uncommitted bytes to disk,
> > > > >> limiting
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>> overall
> > > > >>>>>>>>>>>>>>>>>>>> bytes
> > > > >>>>>>>>>>>>>>>>>>>>>>> might make sense. Thinking about it again and
> > > > >>>>>>>> considering
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>> recent
> > > > >>>>>>>>>>>>>>>>>>>>>>> discussions, it does not make too much sense
> > > > anymore.
> > > > >>>>>>>>>>>>>>>>>>>>>>> I like the name
> > > > >>>>>>>> statestore.transaction.buffer.max.bytes that
> > > > >>>>>>>>>>>> you
> > > > >>>>>>>>>>>>>>>>>>>> proposed.
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> 8.
> > > > >>>>>>>>>>>>>>>>>>>>>>> A high-level description (without
> > implementation
> > > > >>>>>>>> details) of
> > > > >>>>>>>>>>>> how
> > > > >>>>>>>>>>>>>>>>>>>> Kafka
> > > > >>>>>>>>>>>>>>>>>>>>>>> Streams will manage the commit of changelog
> > > > >>>>>>>> transactions,
> > > > >>>>>>>>>>>> state
> > > > >>>>>>>>>>>>>>>> store
> > > > >>>>>>>>>>>>>>>>>>>>>>> transactions and checkpointing would be great.
> > > > Would
> > > > >>>> be
> > > > >>>>>>>>>> great
> > > > >>>>>>>>>>>> if
> > > > >>>>>>>>>>>>>>>> you
> > > > >>>>>>>>>>>>>>>>>>>>>>> could also add some sentences about the
> > behavior in
> > > > >>>>>>>> case of
> > > > >>>>>>>>>> a
> > > > >>>>>>>>>>>>>>>>>>>> failure.
> > > > >>>>>>>>>>>>>>>>>>>>>>> For instance how does a transactional state
> > store
> > > > >>>>>>>> recover
> > > > >>>>>>>>>>>> after a
> > > > >>>>>>>>>>>>>>>>>>>>>>> failure or what happens with the transaction
> > > > buffer,
> > > > >>>>>>>> etc.
> > > > >>>>>>>>>>>> (that
> > > > >>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>>>> what
> > > > >>>>>>>>>>>>>>>>>>>>>>> I meant by "fail-over" in point 9.)
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>>>>>>>>>>>>> Bruno
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>> On 21.06.23 18:50, Nick Telford wrote:
> > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> 1.
> > > > >>>>>>>>>>>>>>>>>>>>>>>> Isn't this exactly the same issue that
> > > > >>>>>>>> WriteBatchWithIndex
> > > > >>>>>>>>>>>>>>>>>>>> transactions
> > > > >>>>>>>>>>>>>>>>>>>>>>>> have, whereby exceeding (or likely to exceed)
> > > > >>>>>>>> configured
> > > > >>>>>>>>>>>> memory
> > > > >>>>>>>>>>>>>>>>>>>> needs to
> > > > >>>>>>>>>>>>>>>>>>>>>>>> trigger an early commit?
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> 2.
> > > > >>>>>>>>>>>>>>>>>>>>>>>> This is one of my big concerns. Ultimately,
> > any
> > > > >>>>>>>> approach
> > > > >>>>>>>>>>>> based
> > > > >>>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>>>>>>>>>> cracking
> > > > >>>>>>>>>>>>>>>>>>>>>>>> open RocksDB internals and using it in ways
> > it's
> > > > not
> > > > >>>>>>>> really
> > > > >>>>>>>>>>>>>>>> designed
> > > > >>>>>>>>>>>>>>>>>>>>>>> for is
> > > > >>>>>>>>>>>>>>>>>>>>>>>> likely to have some unforseen performance or
> > > > >>>>>>>> consistency
> > > > >>>>>>>>>>>> issues.
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> 3.
> > > > >>>>>>>>>>>>>>>>>>>>>>>> What's your motivation for removing these
> > early
> > > > >>>>>>>> commits?
> > > > >>>>>>>>>>>> While
> > > > >>>>>>>>>>>>>> not
> > > > >>>>>>>>>>>>>>>>>>>>>>> ideal, I
> > > > >>>>>>>>>>>>>>>>>>>>>>>> think they're a decent compromise to ensure
> > > > >>>>>>>> consistency
> > > > >>>>>>>>>>>> whilst
> > > > >>>>>>>>>>>>>>>>>>>>>>> maintaining
> > > > >>>>>>>>>>>>>>>>>>>>>>>> good and predictable performance.
> > > > >>>>>>>>>>>>>>>>>>>>>>>> All 3 of your suggested ideas seem *very*
> > > > >>>>>>>> complicated, and
> > > > >>>>>>>>>>>> might
> > > > >>>>>>>>>>>>>>>>>>>>>>> actually
> > > > >>>>>>>>>>>>>>>>>>>>>>>> make behaviour less predictable for users as a
> > > > >>>>>>>> consequence.
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm a bit concerned that the scope of this
> > KIP is
> > > > >>>>>>>> growing a
> > > > >>>>>>>>>>>> bit
> > > > >>>>>>>>>>>>>>>> out
> > > > >>>>>>>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>>>>>>>> control. While it's good to discuss ideas for
> > > > future
> > > > >>>>>>>>>>>>>>>> improvements, I
> > > > >>>>>>>>>>>>>>>>>>>>>>> think

> > > > >>>>>>>>>>>>>>>>>>>>>>>> it's important to narrow the scope down to a
> > > > design
> > > > >>>>>>>> that
> > > > >>>>>>>>>>>>>> achieves
> > > > >>>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>>>>> most
> > > > >>>>>>>>>>>>>>>>>>>>>>>> pressing objectives (constant sized
> > restorations
> > > > >>>>>>>> during
> > > > >>>>>>>>>> dirty
> > > > >>>>>>>>>>>>>>>>>>>>>>>> close/unexpected errors). Any design that
> > this KIP
> > > > >>>>>>>> produces
> > > > >>>>>>>>>>>> can
> > > > >>>>>>>>>>>>>>>>>>>>>>> ultimately
> > > > >>>>>>>>>>>>>>>>>>>>>>>> be changed in the future, especially if the
> > bulk
> > > > of
> > > > >>>>>>>> it is
> > > > >>>>>>>>>>>>>> internal
> > > > >>>>>>>>>>>>>>>>>>>>>>>> behaviour.
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> I'm going to spend some time next week trying
> > to
> > > > >>>>>>>> re-work
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> original
> > > > >>>>>>>>>>>>>>>>>>>>>>>> WriteBatchWithIndex design to remove the
> > > > >>>>>>>> newTransaction()
> > > > >>>>>>>>>>>>>> method,
> > > > >>>>>>>>>>>>>>>>>>>> such
> > > > >>>>>>>>>>>>>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>>>>>>>>>>> it's just an implementation detail of
> > > > RocksDBStore.
> > > > >>>>>>>> That
> > > > >>>>>>>>>>>> way, if
> > > > >>>>>>>>>>>>>>>> we
> > > > >>>>>>>>>>>>>>>>>>>>>>> want to
> > > > >>>>>>>>>>>>>>>>>>>>>>>> replace WBWI with something in the future,
> > like
> > > > the
> > > > >>>>>>>> SST
> > > > >>>>>>>>>> file
> > > > >>>>>>>>>>>>>>>>>>>> management
> > > > >>>>>>>>>>>>>>>>>>>>>>>> outlined by John, then we can do so with
> > little/no
> > > > >>>> API
> > > > >>>>>>>>>>>> changes.
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>> Nick
> > > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> >
> >

Reply via email to