Hi Nick,

6.
Of course, you are right! My bad!
Wiping out the state in the downgrading case is fine.


3a.
Focus on the public facing changes for the KIP. We will manage to get the internals right. Regarding state stores that do not support READ_COMMITTED, they should throw an error stating that they do not support READ_COMMITTED. No need to adapt all state stores immediately.

3b.
I am in favor of using transactions also for ALOS.


Best,
Bruno

On 9/13/23 11:57 AM, Nick Telford wrote:
Hi Bruno,

Thanks for getting back to me!

2.
The fact that implementations can always track estimated memory usage in
the wrapper is a good point. I can remove -1 as an option, and I'll clarify
the JavaDoc that 0 is not just for non-transactional stores, which is
currently misleading.

6.
The problem with catching the exception in the downgrade process is that
would require new code in the Kafka version being downgraded to. Since
users could conceivably downgrade to almost *any* older version of Kafka
Streams, I'm not sure how we could add that code?
The only way I can think of doing it would be to provide a dedicated
downgrade tool, that goes through every local store and removes the
offsets column families. But that seems like an unnecessary amount of extra
code to maintain just to handle a somewhat niche situation, when the
alternative (automatically wipe and restore stores) should be acceptable.

1, 4, 5: Agreed. I'll make the changes you've requested.

3a.
I agree that IsolationLevel makes more sense at query-time, and I actually
initially attempted to place the IsolationLevel at query-time, but I ran
into some problems:
- The key issue is that, under ALOS we're not staging writes in
transactions, so can't perform writes at the READ_COMMITTED isolation
level. However, this may be addressed if we decide to *always* use
transactions as discussed under 3b.
- IQv1 and IQv2 have quite different implementations. I remember having
some difficulty understanding the IQv1 internals, which made it difficult
to determine what needed to be changed. However, I *think* this can be
addressed for both implementations by wrapping the RocksDBStore in an
IsolationLevel-dependent wrapper, that overrides read methods (get, etc.)
to either read directly from the database or from the ongoing transaction.
But IQv1 might still be difficult.
- If IsolationLevel becomes a query constraint, then all other StateStores
will need to respect it, including the in-memory stores. This would require
us to adapt in-memory stores to stage their writes so they can be isolated
from READ_COMMITTTED queries. It would also become an important
consideration for third-party stores on upgrade, as without changes, they
would not support READ_COMMITTED queries correctly.

Ultimately, I may need some help making the necessary change to IQv1 to
support this, but I don't think it's fundamentally impossible, if we want
to pursue this route.

3b.
The main reason I chose to keep ALOS un-transactional was to minimize
behavioural change for most users (I believe most Streams users use the
default configuration, which is ALOS). That said, it's clear that if ALOS
also used transactional stores, the only change in behaviour would be that
it would become *more correct*, which could be considered a "bug fix" by
users, rather than a change they need to handle.

I believe that performance using transactions (aka. RocksDB WriteBatches)
should actually be *better* than the un-batched write-path that is
currently used[1]. The only "performance" consideration will be the
increased memory usage that transactions require. Given the mitigations for
this memory that we have in place, I would expect that this is not a
problem for most users.

If we're happy to do so, we can make ALOS also use transactions.

Regards,
Nick

Link 1:
https://github.com/adamretter/rocksjava-write-methods-benchmark#results

On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna <cado...@apache.org> wrote:

Hi Nick,

Thanks for the updates and sorry for the delay on my side!


1.
Making the default implementation for flush() a no-op sounds good to me.


2.
I think what was bugging me here is that a third-party state store needs
to implement the state store interface. That means they need to
implement a wrapper around the actual state store as we do for RocksDB
with RocksDBStore. So, a third-party state store can always estimate the
uncommitted bytes, if it wants, because the wrapper can record the added
bytes.
One case I can think of where returning -1 makes sense is when Streams
does not need to estimate the size of the write batch and trigger
extraordinary commits, because the third-party state store takes care of
memory. But in that case the method could also just return 0. Even that
case would be better solved with a method that returns whether the state
store manages itself the memory used for uncommitted bytes or not.
Said that, I am fine with keeping the -1 return value, I was just
wondering when and if it will be used.

Regarding returning 0 for transactional state stores when the batch is
empty, I was just wondering because you explicitly stated

"or {@code 0} if this StateStore does not support transactions."

So it seemed to me returning 0 could only happen for non-transactional
state stores.


3.

a) What do you think if we move the isolation level to IQ (v1 and v2)?
In the end this is the only component that really needs to specify the
isolation level. It is similar to the Kafka consumer that can choose
with what isolation level to read the input topic.
For IQv1 the isolation level should go into StoreQueryParameters. For
IQv2, I would add it to the Query interface.

b) Point a) raises the question what should happen during at-least-once
processing when the state store does not use transactions? John in the
past proposed to also use transactions on state stores for
at-least-once. I like that idea, because it avoids aggregating the same
records over and over again in the case of a failure. We had a case in
the past where a Streams applications in at-least-once mode was failing
continuously for some reasons I do not remember before committing the
offsets. After each failover, the app aggregated again and again the
same records. Of course the aggregate increased to very wrong values
just because of the failover. With transactions on the state stores we
could have avoided this. The app would have output the same aggregate
multiple times (i.e., after each failover) but at least the value of the
aggregate would not depend on the number of failovers. Outputting the
same aggregate multiple times would be incorrect under exactly-once but
it is OK for at-least-once.
If it makes sense to add a config to turn on and off transactions on
state stores under at-least-once or just use transactions in any case is
a question we should also discuss in this KIP. It depends a bit on the
performance trade-off. Maybe to be safe, I would add a config.


4.
Your points are all valid. I tend to say to keep the metrics around
flush() until we remove flush() completely from the interface. Calls to
flush() might still exist since existing processors might still call
flush() explicitly as you mentioned in 1). For sure, we need to document
how the metrics change due to the transactions in the upgrade notes.


5.
I see. Then you should describe how the .position files are handled  in
a dedicated section of the KIP or incorporate the description in the
"Atomic Checkpointing" section instead of only mentioning it in the
"Compatibility, Deprecation, and Migration Plan".


6.
Describing upgrading and downgrading in the KIP is a good idea.
Regarding downgrading, I think you could also catch the exception and do
what is needed to downgrade, e.g., drop the column family. See here for
an example:


https://github.com/apache/kafka/blob/63fee01366e6ce98b9dfafd279a45d40b80e282d/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L75

It is a bit brittle, but it works.


Best,
Bruno


On 8/24/23 12:18 PM, Nick Telford wrote:
Hi Bruno,

Thanks for taking the time to review the KIP. I'm back from leave now and
intend to move this forwards as quickly as I can.

Addressing your points:

1.
Because flush() is part of the StateStore API, it's exposed to custom
Processors, which might be making calls to flush(). This was actually the
case in a few integration tests.
To maintain as much compatibility as possible, I'd prefer not to make
this
an UnsupportedOperationException, as it will cause previously working
Processors to start throwing exceptions at runtime.
I agree that it doesn't make sense for it to proxy commit(), though, as
that would cause it to violate the "StateStores commit only when the Task
commits" rule.
Instead, I think we should make this a no-op. That way, existing user
Processors will continue to work as-before, without violation of store
consistency that would be caused by premature flush/commit of StateStore
data to disk.
What do you think?

2.
As stated in the JavaDoc, when a StateStore implementation is
transactional, but is unable to estimate the uncommitted memory usage,
the
method will return -1.
The intention here is to permit third-party implementations that may not
be
able to estimate memory usage.

Yes, it will be 0 when nothing has been written to the store yet. I
thought
that was implied by "This method will return an approximation of the
memory
would be freed by the next call to {@link #commit(Map)}" and "@return The
approximate size of all records awaiting {@link #commit(Map)}", however,
I
can add it explicitly to the JavaDoc if you think this is unclear?

3.
I realise this is probably the most contentious point in my design, and
I'm
open to changing it if I'm unable to convince you of the benefits.
Nevertheless, here's my argument:
The Interactive Query (IQ) API(s) are directly provided StateStores to
query, and it may be important for users to programmatically know which
mode the StateStore is operating under. If we simply provide an
"eosEnabled" boolean (as used throughout the internal streams engine), or
similar, then users will need to understand the operation and
consequences
of each available processing mode and how it pertains to their
StateStore.

Interactive Query users aren't the only people that care about the
processing.mode/IsolationLevel of a StateStore: implementers of custom
StateStores also need to understand the behaviour expected of their
implementation. KIP-892 introduces some assumptions into the Streams
Engine
about how StateStores operate under each processing mode, and it's
important that custom implementations adhere to those assumptions in
order
to maintain the consistency guarantees.

IsolationLevels provide a high-level contract on the behaviour of the
StateStore: a user knows that under READ_COMMITTED, they will see writes
only after the Task has committed, and under READ_UNCOMMITTED they will
see
writes immediately. No understanding of the details of each
processing.mode
is required, either for IQ users or StateStore implementers.

An argument can be made that these contractual guarantees can simply be
documented for the processing.mode (i.e. that exactly-once and
exactly-once-v2 behave like READ_COMMITTED and at-least-once behaves like
READ_UNCOMMITTED), but there are several small issues with this I'd
prefer
to avoid:

     - Where would we document these contracts, in a way that is difficult
     for users/implementers to miss/ignore?
     - It's not clear to users that the processing mode is communicating
     an expectation of read isolation, unless they read the
documentation. Users
     rarely consult documentation unless they feel they need to, so it's
likely
     this detail would get missed by many users.
     - It tightly couples processing modes to read isolation. Adding new
     processing modes, or changing the read isolation of existing
processing
     modes would be difficult/impossible.

Ultimately, the cost of introducing IsolationLevels is just a single
method, since we re-use the existing IsolationLevel enum from Kafka. This
gives us a clear place to document the contractual guarantees expected
of/provided by StateStores, that is accessible both by the StateStore
itself, and by IQ users.

(Writing this I've just realised that the StateStore and IQ APIs actually
don't provide access to StateStoreContext that IQ users would have direct
access to... Perhaps StateStore should expose isolationLevel() itself
too?)

4.
Yeah, I'm not comfortable renaming the metrics in-place either, as it's a
backwards incompatible change. My concern is that, if we leave the
existing
"flush" metrics in place, they will be confusing to users. Right now,
"flush" metrics record explicit flushes to disk, but under KIP-892, even
a
commit() will not explicitly flush data to disk - RocksDB will decide on
when to flush memtables to disk itself.

If we keep the existing "flush" metrics, we'd have two options, which
both
seem pretty bad to me:

     1. Have them record calls to commit(), which would be misleading, as
     data is no longer explicitly "flushed" to disk by this call.
     2. Have them record nothing at all, which is equivalent to removing
the
     metrics, except that users will see the metric still exists and so
assume
     that the metric is correct, and that there's a problem with their
system
     when there isn't.

I agree that removing them is also a bad solution, and I'd like some
guidance on the best path forward here.

5.
Position files are updated on every write to a StateStore. Since our
writes
are now buffered until commit(), we can't update the Position file until
commit() has been called, otherwise it would be inconsistent with the
data
in the event of a rollback. Consequently, we need to manage these offsets
the same way we manage the checkpoint offsets, and ensure they're only
written on commit().

6.
Agreed, although I'm not exactly sure yet what tests to write. How
explicit
do we need to be here in the KIP?

As for upgrade/downgrade: upgrade is designed to be seamless, and we
should
definitely add some tests around that. Downgrade, it transpires, isn't
currently possible, as the extra column family for offset storage is
incompatible with the pre-KIP-892 implementation: when you open a RocksDB
database, you must open all available column families or receive an
error.
What currently happens on downgrade is that it attempts to open the
store,
throws an error about the offsets column family not being opened, which
triggers a wipe and rebuild of the Task. Given that downgrades should be
uncommon, I think this is acceptable behaviour, as the end-state is
consistent, even if it results in an undesirable state restore.

Should I document the upgrade/downgrade behaviour explicitly in the KIP?

--

Regards,
Nick


On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna <cado...@apache.org> wrote:

Hi Nick!

Thanks for the updates!

1.
Why does StateStore#flush() default to
StateStore#commit(Collections.emptyMap())?
Since calls to flush() will not exist anymore after this KIP is
released, I would rather throw an unsupported operation exception by
default.


2.
When would a state store return -1 from
StateStore#approximateNumUncommittedBytes() while being transactional?

Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if
the state store is transactional but nothing has been written to the
state store yet?


3.
Sorry for bringing this up again. Does this KIP really need to introduce
StateStoreContext#isolationLevel()? StateStoreContext has already
appConfigs() which basically exposes the same information, i.e., if EOS
is enabled or not.
In one of your previous e-mails you wrote:

"My idea was to try to keep the StateStore interface as loosely coupled
from the Streams engine as possible, to give implementers more freedom,
and reduce the amount of internal knowledge required."

While I understand the intent, I doubt that it decreases the coupling of
a StateStore interface and the Streams engine. READ_COMMITTED only
applies to IQ but not to reads by processors. Thus, implementers need to
understand how Streams accesses the state stores.

I would like to hear what others think about this.


4.
Great exposing new metrics for transactional state stores! However, I
would prefer to add new metrics and deprecate (in the docs) the old
ones. You can find examples of deprecated metrics here:
https://kafka.apache.org/documentation/#selector_monitoring


5.
Why does the KIP mention position files? I do not think they are related
to transactions or flushes.


6.
I think we will also need to adapt/add integration tests besides unit
tests. Additionally, we probably need integration or system tests to
verify that upgrades and downgrades between transactional and
non-transactional state stores work as expected.


Best,
Bruno





On 7/21/23 10:34 PM, Nick Telford wrote:
One more thing: I noted John's suggestion in the KIP, under "Rejected
Alternatives". I still think it's an idea worth pursuing, but I believe
that it's out of the scope of this KIP, because it solves a different
set
of problems to this KIP, and the scope of this one has already grown
quite
large!

On Fri, 21 Jul 2023 at 21:33, Nick Telford <nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP (


https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
)
with the latest changes; mostly bringing back "Atomic Checkpointing"
(for
what feels like the 10th time!). I think the one thing missing is some
changes to metrics (notably the store "flush" metrics will need to be
renamed to "commit").

The reason I brought back Atomic Checkpointing was to decouple store
flush
from store commit. This is important, because with Transactional
StateStores, we now need to call "flush" on *every* Task commit, and
not
just when the StateStore is closing, otherwise our transaction buffer
will
never be written and persisted, instead growing unbounded! I
experimented
with some simple solutions, like forcing a store flush whenever the
transaction buffer was likely to exceed its configured size, but this
was
brittle: it prevented the transaction buffer from being configured to
be
unbounded, and it still would have required explicit flushes of
RocksDB,
yielding sub-optimal performance and memory utilization.

I deemed Atomic Checkpointing to be the "right" way to resolve this
problem. By ensuring that the changelog offsets that correspond to the
most
recently written records are always atomically written to the
StateStore
(by writing them to the same transaction buffer), we can avoid
forcibly
flushing the RocksDB memtables to disk, letting RocksDB flush them
only
when necessary, without losing any of our consistency guarantees. See
the
updated KIP for more info.

I have fully implemented these changes, although I'm still not
entirely
happy with the implementation for segmented StateStores, so I plan to
refactor that. Despite that, all tests pass. If you'd like to try out
or
review this highly experimental and incomplete branch, it's available
here:
https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0. Note: it's
built
against Kafka 3.5.0 so that I had a stable base to build and test it
on,
and to enable easy apples-to-apples comparisons in a live
environment. I
plan to rebase it against trunk once it's nearer completion and has
been
proven on our main application.

I would really appreciate help in reviewing and testing:
- Segmented (Versioned, Session and Window) stores
- Global stores

As I do not currently use either of these, so my primary test
environment
doesn't test these areas.

I'm going on Parental Leave starting next week for a few weeks, so
will
not have time to move this forward until late August. That said, your
feedback is welcome and appreciated, I just won't be able to respond
as
quickly as usual.

Regards,
Nick

On Mon, 3 Jul 2023 at 16:23, Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Bruno

Yes, that's correct, although the impact on IQ is not something I had
considered.

What about atomically updating the state store from the transaction
buffer every commit interval and writing the checkpoint (thus,
flushing
the memtable) every configured amount of data and/or number of
commit
intervals?


I'm not quite sure I follow. Are you suggesting that we add an
additional
config for the max number of commit intervals between checkpoints?
That
way, we would checkpoint *either* when the transaction buffers are
nearly
full, *OR* whenever a certain number of commit intervals have
elapsed,
whichever comes first?

That certainly seems reasonable, although this re-ignites an earlier
debate about whether a config should be measured in "number of commit
intervals", instead of just an absolute time.

FWIW, I realised that this issue is the reason I was pursuing the
Atomic
Checkpoints, as it de-couples memtable flush from checkpointing,
which
enables us to just checkpoint on every commit without any performance
impact. Atomic Checkpointing is definitely the "best" solution, but
I'm not
sure if this is enough to bring it back into this KIP.

I'm currently working on moving all the transactional logic directly
into
RocksDBStore itself, which does away with the
StateStore#newTransaction
method, and reduces the number of new classes introduced,
significantly
reducing the complexity. If it works, and the complexity is
drastically
reduced, I may try bringing back Atomic Checkpoints into this KIP.

Regards,
Nick

On Mon, 3 Jul 2023 at 15:27, Bruno Cadonna <cado...@apache.org>
wrote:

Hi Nick,

Thanks for the insights! Very interesting!

As far as I understand, you want to atomically update the state
store
from the transaction buffer, flush the memtable of a state store and
write the checkpoint not after the commit time elapsed but after the
transaction buffer reached a size that would lead to exceeding
statestore.transaction.buffer.max.bytes before the next commit
interval
ends.
That means, the Kafka transaction would commit every commit interval
but
the state store will only be atomically updated roughly every
statestore.transaction.buffer.max.bytes of data. Also IQ would then
only
see new data roughly every statestore.transaction.buffer.max.bytes.
After a failure the state store needs to restore up to
statestore.transaction.buffer.max.bytes.

Is this correct?

What about atomically updating the state store from the transaction
buffer every commit interval and writing the checkpoint (thus,
flushing
the memtable) every configured amount of data and/or number of
commit
intervals? In such a way, we would have the same delay for records
appearing in output topics and IQ because both would appear when the
Kafka transaction is committed. However, after a failure the state
store
still needs to restore up to statestore.transaction.buffer.max.bytes
and
it might restore data that is already in the state store because the
checkpoint lags behind the last stable offset (i.e. the last
committed
offset) of the changelog topics. Restoring data that is already in
the
state store is idempotent, so eos should not violated.
This solution needs at least one new config to specify when a
checkpoint
should be written.



A small correction to your previous e-mail that does not change
anything
you said: Under alos the default commit interval is 30 seconds, not
five
seconds.


Best,
Bruno


On 01.07.23 12:37, Nick Telford wrote:
Hi everyone,

I've begun performance testing my branch on our staging
environment,
putting it through its paces in our non-trivial application. I'm
already
observing the same increased flush rate that we saw the last time
we
attempted to use a version of this KIP, but this time, I think I
know
why.

Pre-KIP-892, StreamTask#postCommit, which is called at the end of
the
Task
commit process, has the following behaviour:

       - Under ALOS: checkpoint the state stores. This includes
       flushing memtables in RocksDB. This is acceptable because the
default
       commit.interval.ms is 5 seconds, so forcibly flushing
memtables
every 5
       seconds is acceptable for most applications.
       - Under EOS: checkpointing is not done, *unless* it's being
forced, due
       to e.g. the Task closing or being revoked. This means that
under
normal
       processing conditions, the state stores will not be
checkpointed,
and will
       not have memtables flushed at all , unless RocksDB decides to
flush them on
       its own. Checkpointing stores and force-flushing their
memtables
is only
       done when a Task is being closed.

Under EOS, KIP-892 needs to checkpoint stores on at least *some*
normal
Task commits, in order to write the RocksDB transaction buffers to
the
state stores, and to ensure the offsets are synced to disk to
prevent
restores from getting out of hand. Consequently, my current
implementation
calls maybeCheckpoint on *every* Task commit, which is far too
frequent.
This causes checkpoints every 10,000 records, which is a change in
flush
behaviour, potentially causing performance problems for some
applications.

I'm looking into possible solutions, and I'm currently leaning
towards
using the statestore.transaction.buffer.max.bytes configuration to
checkpoint Tasks once we are likely to exceed it. This would
complement the
existing "early Task commit" functionality that this configuration
provides, in the following way:

       - Currently, we use statestore.transaction.buffer.max.bytes
to
force an
       early Task commit if processing more records would cause our
state
store
       transactions to exceed the memory assigned to them.
       - New functionality: when a Task *does* commit, we will not
checkpoint
       the stores (and hence flush the transaction buffers) unless
we
expect to
       cross the statestore.transaction.buffer.max.bytes threshold
before
the next
       commit

I'm also open to suggestions.

Regards,
Nick

On Thu, 22 Jun 2023 at 14:06, Nick Telford <nick.telf...@gmail.com

wrote:

Hi Bruno!

3.
By "less predictable for users", I meant in terms of understanding
the
performance profile under various circumstances. The more complex
the
solution, the more difficult it would be for users to understand
the
performance they see. For example, spilling records to disk when
the
transaction buffer reaches a threshold would, I expect, reduce
write
throughput. This reduction in write throughput could be
unexpected,
and
potentially difficult to diagnose/understand for users.
At the moment, I think the "early commit" concept is relatively
straightforward; it's easy to document, and conceptually fairly
obvious to
users. We could probably add a metric to make it easier to
understand
when
it happens though.

3. (the second one)
The IsolationLevel is *essentially* an indirect way of telling
StateStores
whether they should be transactional. READ_COMMITTED essentially
requires
transactions, because it dictates that two threads calling
`newTransaction()` should not see writes from the other
transaction
until
they have been committed. With READ_UNCOMMITTED, all bets are off,
and
stores can allow threads to observe written records at any time,
which is
essentially "no transactions". That said, StateStores are free to
implement
these guarantees however they can, which is a bit more relaxed
than
dictating "you must use transactions". For example, with RocksDB
we
would
implement these as READ_COMMITTED == WBWI-based "transactions",
READ_UNCOMMITTED == direct writes to the database. But with other
storage
engines, it might be preferable to *always* use transactions, even
when
unnecessary; or there may be storage engines that don't provide
transactions, but the isolation guarantees can be met using a
different
technique.
My idea was to try to keep the StateStore interface as loosely
coupled
from the Streams engine as possible, to give implementers more
freedom, and
reduce the amount of internal knowledge required.
That said, I understand that "IsolationLevel" might not be the
right
abstraction, and we can always make it much more explicit if
required, e.g.
boolean transactional()

7-8.
I can make these changes either later today or tomorrow.

Small update:
I've rebased my branch on trunk and fixed a bunch of issues that
needed
addressing. Currently, all the tests pass, which is promising, but
it
will
need to undergo some performance testing. I haven't (yet) worked
on
removing the `newTransaction()` stuff, but I would expect that,
behaviourally, it should make no difference. The branch is
available
at
https://github.com/nicktelford/kafka/tree/KIP-892-c if anyone is
interested in taking an early look.

Regards,
Nick

On Thu, 22 Jun 2023 at 11:59, Bruno Cadonna <cado...@apache.org>
wrote:

Hi Nick,

1.
Yeah, I agree with you. That was actually also my point. I
understood
that John was proposing the ingestion path as a way to avoid the
early
commits. Probably, I misinterpreted the intent.

2.
I agree with John here, that actually it is public API. My
question
is
how this usage pattern affects normal processing.

3.
My concern is that checking for the size of the transaction
buffer
and
maybe triggering an early commit affects the whole processing of
Kafka
Streams. The transactionality of a state store is not confined to
the
state store itself, but spills over and changes the behavior of
other
parts of the system. I agree with you that it is a decent
compromise. I
just wanted to analyse the downsides and list the options to
overcome
them. I also agree with you that all options seem quite heavy
compared
with your KIP. I do not understand what you mean with "less
predictable
for users", though.


I found the discussions about the alternatives really
interesting.
But I
also think that your plan sounds good and we should continue with
it!


Some comments on your reply to my e-mail on June 20th:

3.
Ah, now, I understand the reasoning behind putting isolation
level
in
the state store context. Thanks! Should that also be a way to
give
the
the state store the opportunity to decide whether to turn on
transactions or not?
With my comment, I was more concerned about how do you know if a
checkpoint file needs to be written under EOS, if you do not
have a
way
to know if the state store is transactional or not. If a state
store
is
transactional, the checkpoint file can be written during normal
processing under EOS. If the state store is not transactional,
the
checkpoint file must not be written under EOS.

7.
My point was about not only considering the bytes in memory in
config
statestore.uncommitted.max.bytes, but also bytes that might be
spilled
on disk. Basically, I was wondering whether you should remove the
"memory" in "Maximum number of memory bytes to be used to
buffer uncommitted state-store records." My thinking was that
even
if a
state store spills uncommitted bytes to disk, limiting the
overall
bytes
might make sense. Thinking about it again and considering the
recent
discussions, it does not make too much sense anymore.
I like the name statestore.transaction.buffer.max.bytes that you
proposed.

8.
A high-level description (without implementation details) of how
Kafka
Streams will manage the commit of changelog transactions, state
store
transactions and checkpointing would be great. Would be great if
you
could also add some sentences about the behavior in case of a
failure.
For instance how does a transactional state store recover after a
failure or what happens with the transaction buffer, etc. (that
is
what
I meant by "fail-over" in point 9.)

Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:
Hi Bruno,

1.
Isn't this exactly the same issue that WriteBatchWithIndex
transactions
have, whereby exceeding (or likely to exceed) configured memory
needs to
trigger an early commit?

2.
This is one of my big concerns. Ultimately, any approach based
on
cracking
open RocksDB internals and using it in ways it's not really
designed
for is
likely to have some unforseen performance or consistency issues.

3.
What's your motivation for removing these early commits? While
not
ideal, I
think they're a decent compromise to ensure consistency whilst
maintaining
good and predictable performance.
All 3 of your suggested ideas seem *very* complicated, and might
actually
make behaviour less predictable for users as a consequence.

I'm a bit concerned that the scope of this KIP is growing a bit
out
of
control. While it's good to discuss ideas for future
improvements, I
think
it's important to narrow the scope down to a design that
achieves
the
most
pressing objectives (constant sized restorations during dirty
close/unexpected errors). Any design that this KIP produces can
ultimately
be changed in the future, especially if the bulk of it is
internal
behaviour.

I'm going to spend some time next week trying to re-work the
original
WriteBatchWithIndex design to remove the newTransaction()
method,
such
that
it's just an implementation detail of RocksDBStore. That way, if
we
want to
replace WBWI with something in the future, like the SST file
management
outlined by John, then we can do so with little/no API changes.

Regards,

Nick












Reply via email to