Hi Nick,

Thanks for the updates!

I have a couple of questions/comments.

1.
Why do you propose a configuration that involves max. bytes and max. reords? I think we are mainly concerned about memory consumption because we want to limit the off-heap memory used. I cannot think of a case where one would want to set the max. number of records.


2.
Why does

 default void commit(final Map<TopicPartition, Long> changelogOffsets) {
     flush();
 }

take a map of partitions to changelog offsets?
The mapping between state stores to partitions is a 1:1 relationship. Passing in a single changelog offset should suffice.


3.
Why do we need the Transaction interface? It should be possible to hide beginning and committing a transactions withing the state store implementation, so that from outside the state store, it does not matter whether the state store is transactional or not. What would be the advantage of using the Transaction interface?


4.
Regarding checkpointing offsets, I think we should keep the checkpoint file in any case for the reason you mentioned about rebalancing. Even if that would not be an issue, I would propose to move the change to offset management to a new KIP and to not add more complexity than needed to this one. I would not be too concerned about the consistency violation you mention. As far as I understand, with transactional state stores Streams would write the checkpoint file during every commit even under EOS. In the failure case you describe, Streams would restore the state stores from the offsets found in the checkpoint file written during the penultimate commit instead of during the last commit. Basically, Streams would overwrite the records written to the state store between the last two commits with the same records read from the changelogs. While I understand that this is wasteful, it is -- at the same time -- acceptable and most importantly it does not break EOS.

Best,
Bruno


On 27.04.23 12:34, Nick Telford wrote:
Hi everyone,

I find myself (again) considering removing the offset management from
StateStores, and keeping the old checkpoint file system. The reason is that
the StreamPartitionAssignor directly reads checkpoint files in order to
determine which instance has the most up-to-date copy of the local state.
If we move offsets into the StateStore itself, then we will need to open,
initialize, read offsets and then close each StateStore (that is not
already assigned and open) for which we have *any* local state, on every
rebalance.

Generally, I don't think there are many "orphan" stores like this sitting
around on most instances, but even a few would introduce additional latency
to an already somewhat lengthy rebalance procedure.

I'm leaning towards Colt's (Slack) suggestion of just keeping things in the
checkpoint file(s) for now, and not worrying about the race. The downside
is that we wouldn't be able to remove the explicit RocksDB flush on-commit,
which likely hurts performance.

If anyone has any thoughts or ideas on this subject, I would appreciate it!

Regards,
Nick

On Wed, 19 Apr 2023 at 15:05, Nick Telford <nick.telf...@gmail.com> wrote:

Hi Colt,

The issue is that if there's a crash between 2 and 3, then you still end
up with inconsistent data in RocksDB. The only way to guarantee that your
checkpoint offsets and locally stored data are consistent with each other
are to atomically commit them, which can be achieved by having the offsets
stored in RocksDB.

The offsets column family is likely to be extremely small (one
per-changelog partition + one per Topology input partition for regular
stores, one per input partition for global stores). So the overhead will be
minimal.

A major benefit of doing this is that we can remove the explicit calls to
db.flush(), which forcibly flushes memtables to disk on-commit. It turns
out, RocksDB memtable flushes are largely dictated by Kafka Streams
commits, *not* RocksDB configuration, which could be a major source of
confusion. Atomic checkpointing makes it safe to remove these explicit
flushes, because it no longer matters exactly when RocksDB flushes data to
disk; since the data and corresponding checkpoint offsets will always be
flushed together, the local store is always in a consistent state, and
on-restart, it can always safely resume restoration from the on-disk
offsets, restoring the small amount of data that hadn't been flushed when
the app exited/crashed.

Regards,
Nick

On Wed, 19 Apr 2023 at 14:35, Colt McNealy <c...@littlehorse.io> wrote:

Nick,

Thanks for your reply. Ack to A) and B).

For item C), I see what you're referring to. Your proposed solution will
work, so no need to change it. What I was suggesting was that it might be
possible to achieve this with only one column family. So long as:

    - No uncommitted records (i.e. not committed to the changelog) are
    *committed* to the state store, AND
    - The Checkpoint offset (which refers to the changelog topic) is less
    than or equal to the last written changelog offset in rocksdb

I don't see the need to do the full restoration from scratch. My
understanding was that prior to 844/892, full restorations were required
because there could be uncommitted records written to RocksDB; however,
given your use of RocksDB transactions, that can be avoided with the
pattern of 1) commit Kafka transaction, 2) commit RocksDB transaction, 3)
update offset in checkpoint file.

Anyways, your proposed solution works equivalently and I don't believe
there is much overhead to an additional column family in RocksDB. Perhaps
it may even perform better than making separate writes to the checkpoint
file.

Colt McNealy
*Founder, LittleHorse.io*


On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Colt,

A. I've done my best to de-couple the StateStore stuff from the rest of
the
Streams engine. The fact that there will be only one ongoing (write)
transaction at a time is not guaranteed by any API, and is just a
consequence of the way Streams operates. To that end, I tried to ensure
the
documentation and guarantees provided by the new APIs are independent of
this incidental behaviour. In practice, you're right, this essentially
refers to "interactive queries", which are technically "read
transactions",
even if they don't actually use the transaction API to isolate
themselves.

B. Yes, although not ideal. This is for backwards compatibility,
because:
     1) Existing custom StateStore implementations will implement
flush(),
and not commit(), but the Streams engine now calls commit(), so those
calls
need to be forwarded to flush() for these legacy stores.
     2) Existing StateStore *users*, i.e. outside of the Streams engine
itself, may depend on explicitly calling flush(), so for these cases,
flush() needs to be redirected to call commit().
If anyone has a better way to guarantee compatibility without
introducing
this potential recursion loop, I'm open to changes!

C. This is described in the "Atomic Checkpointing" section. Offsets are
stored in a separate RocksDB column family, which is guaranteed to be
atomically flushed to disk with all other column families. The issue of
checkpoints being written to disk after commit causing inconsistency if
it
crashes in between is the reason why, under EOS, checkpoint files are
only
written on clean shutdown. This is one of the major causes of "full
restorations", so moving the offsets into a place where they can be
guaranteed to be atomically written with the data they checkpoint
allows us
to write the checkpoint offsets *on every commit*, not just on clean
shutdown.

Regards,
Nick

On Tue, 18 Apr 2023 at 15:39, Colt McNealy <c...@littlehorse.io> wrote:

Nick,

Thank you for continuing this work. I have a few minor clarifying
questions.

A) "Records written to any transaction are visible to all other
transactions immediately." I am confused here—I thought there could
only
be
one transaction going on at a time for a given state store given the
threading model for processing records on a Task. Do you mean
Interactive
Queries by "other transactions"? (If so, then everything makes sense—I
thought that since IQ were read-only then they didn't count as
transactions).

B) Is it intentional that the default implementations of the flush()
and
commit() methods in the StateStore class refer to each other in some
sort
of unbounded recursion?

C) How will the getCommittedOffset() method work? At first I thought
the
way to do it would be using a special key in the RocksDB store to
store
the
offset, and committing that with the transaction. But upon second
thought,
since restoration from the changelog is an idempotent procedure, I
think
it
would be fine to 1) commit the RocksDB transaction and then 2) write
the
offset to disk in a checkpoint file. If there is a crash between 1)
and
2),
I think the only downside is now we replay a few more records (at a
cost
of
<100ms). Am I missing something there?

Other than that, everything makes sense to me.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP to reflect the latest version of the design:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores

There are several changes in there that reflect feedback from this
thread,
and there's a new section and a bunch of interface changes relating
to
Atomic Checkpointing, which is the final piece of the puzzle to
making
everything robust.

Let me know what you think!

Regards,
Nick

On Tue, 3 Jan 2023 at 11:33, Nick Telford <nick.telf...@gmail.com>
wrote:

Hi Lucas,

Thanks for looking over my KIP.

A) The bound is per-instance, not per-Task. This was a typo in the
KIP
that I've now corrected. It was originally per-Task, but I
changed it
to
per-instance for exactly the reason you highlighted.
B) It's worth noting that transactionality is only enabled under
EOS,
and
in the default mode of operation (ALOS), there should be no
change in
behavior at all. I think, under EOS, we can mitigate the impact on
users
by
sufficiently low default values for the memory bound
configuration. I
understand your hesitation to include a significant change of
behaviour,
especially in a minor release, but I suspect that most users will
prefer
the memory impact (under EOS) to the existing behaviour of
frequent
state
restorations! If this is a problem, the changes can wait until the
next
major release. I'll be running a patched version of streams in
production
with these changes as soon as they're ready, so it won't disrupt
me
:-D
C) The main purpose of this sentence was just to note that some
changes
will need to be made to the way Segments are handled in order to
ensure
they also benefit from transactions. At the time I wrote it, I
hadn't
figured out the specific changes necessary, so it was deliberately
vague.
This is the one outstanding problem I'm currently working on, and
I'll
update this section with more detail once I have figured out the
exact
changes required.
D) newTransaction() provides the necessary isolation guarantees.
While
the RocksDB implementation of transactions doesn't technically
*need*
read-only users to call newTransaction(), other implementations
(e.g. a
hypothetical PostgresStore) may require it. Calling
newTransaction()
when
no transaction is necessary is essentially free, as it will just
return
this.

I didn't do any profiling of the KIP-844 PoC, but I think it
should
be
fairly obvious where the performance problems stem from: writes
under
KIP-844 require 3 extra memory-copies: 1 to encode it with the
tombstone/record flag, 1 to decode it from the tombstone/record
flag,
and 1
to copy the record from the "temporary" store to the "main" store,
when
the
transaction commits. The different approach taken by KIP-869
should
perform
much better, as it avoids all these copies, and may actually
perform
slightly better than trunk, due to batched writes in RocksDB
performing
better than non-batched writes.[1]

Regards,
Nick

1:

https://github.com/adamretter/rocksjava-write-methods-benchmark#results

On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
lbruts...@confluent.io
.invalid>
wrote:

Hi Nick,

I'm just starting to read up on the whole discussion about
KIP-892
and
KIP-844. Thanks a lot for your work on this, I do think
`WriteBatchWithIndex` may be the way to go here. I do have some
questions about the latest draft.

  A) If I understand correctly, you propose to put a bound on the
(native) memory consumed by each task. However, I wonder if this
is
sufficient if we have temporary imbalances in the cluster. For
example, depending on the timing of rebalances during a cluster
restart, it could happen that a single streams node is assigned a
lot
more tasks than expected. With your proposed change, this would
mean
that the memory required by this one node could be a multiple of
what
is required during normal operation. I wonder if it wouldn't be
safer
to put a global bound on the memory use, across all tasks.
  B) Generally, the memory concerns still give me the feeling that
this
should not be enabled by default for all users in a minor
release.
  C) In section "Transaction Management": the sentence "A similar
analogue will be created to automatically manage `Segment`
transactions.". Maybe this is just me lacking some background,
but I
do not understand this, it would be great if you could clarify
what
you mean here.
  D) Could you please clarify why IQ has to call newTransaction(),
when
it's read-only.

And one last thing not strictly related to your KIP: if there is
an
easy way for you to find out why the KIP-844 PoC is 20x slower
(e.g.
by providing a flame graph), that would be quite interesting.

Cheers,
Lucas

On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP with a more detailed design, which
reflects
the
implementation I've been working on:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores

This new design should address the outstanding points already
made
in
the
thread.

Please let me know if there are areas that are unclear or need
more
clarification.

I have a (nearly) working implementation. I'm confident that
the
remaining
work (making Segments behave) will not impact the documented
design.

Regards,

Nick

On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io

wrote:

Nick,

Thank you for the reply; that makes sense. I was hoping that,
since
reading
uncommitted records from IQ in EOS isn't part of the
documented
API,
maybe
you *wouldn't* have to wait for the next major release to
make
that
change;
but given that it would be considered a major change, I like
your
approach
the best.

Wishing you a speedy recovery and happy coding!

Thanks,
Colt McNealy
*Founder, LittleHorse.io*


On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Colt,

10: Yes, I agree it's not ideal. I originally intended to
try
to
keep the
behaviour unchanged as much as possible, otherwise we'd
have
to
wait for
a
major version release to land these changes.
20: Good point, ALOS doesn't need the same level of
guarantee,
and
the
typically longer commit intervals would be problematic when
reading
only
"committed" records.

I've been away for 5 days recovering from minor surgery,
but I
spent a
considerable amount of that time working through ideas for
possible
solutions in my head. I think your suggestion of keeping
ALOS
as-is, but
buffering writes for EOS is the right path forwards,
although
I
have a
solution that both expands on this, and provides for some
more
formal
guarantees.

Essentially, adding support to KeyValueStores for
"Transactions",
with
clearly defined IsolationLevels. Using "Read Committed"
when
under
EOS,
and
"Read Uncommitted" under ALOS.

The nice thing about this approach is that it gives us much
more
clearly
defined isolation behaviour that can be properly
documented to
ensure
users
know what to expect.

I'm still working out the kinks in the design, and will
update
the
KIP
when
I have something. The main struggle is trying to implement
this
without
making any major changes to the existing interfaces or
breaking
existing
implementations, because currently everything expects to
operate
directly
on a StateStore, and not a Transaction of that store. I
think
I'm
getting
close, although sadly I won't be able to progress much
until
next
week
due
to some work commitments.

Regards,
Nick

On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
c...@littlehorse.io>
wrote:

Nick,

Thank you for the explanation, and also for the updated
KIP. I
am
quite
eager for this improvement to be released as it would
greatly
reduce
the
operational difficulties of EOS streams apps.

Two questions:

10)
When reading records, we will use the
WriteBatchWithIndex#getFromBatchAndDB
  and WriteBatchWithIndex#newIteratorWithBase utilities in
order
to
ensure
that uncommitted writes are available to query.
Why do extra work to enable the reading of uncommitted
writes
during
IQ?
Code complexity aside, reading uncommitted writes is, in
my
opinion, a
minor flaw in EOS IQ; it would be very nice to have the
guarantee
that,
with EOS, IQ only reads committed records. In order to
avoid
dirty
reads,
one currently must query a standby replica (but this
still
doesn't
fully
guarantee monotonic reads).

20) Is it also necessary to enable this optimization on
ALOS
stores?
The
motivation of KIP-844 was mainly to reduce the need to
restore
state
from
scratch on unclean EOS shutdowns; with ALOS it was
acceptable
to
accept
that there may have been uncommitted writes on disk. On a
side
note, if
you
enable this type of store on ALOS processors, the
community
would
definitely want to enable queries on dirty reads;
otherwise
users
would
have to wait 30 seconds (default) to see an update.

Thank you for doing this fantastic work!
Colt McNealy
*Founder, LittleHorse.io*


On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

I've drastically reduced the scope of this KIP to no
longer
include
the
StateStore management of checkpointing. This can be
added
as a
KIP
later
on
to further optimize the consistency and performance of
state
stores.

I've also added a section discussing some of the
concerns
around
concurrency, especially in the presence of Iterators.
I'm
thinking of
wrapping WriteBatchWithIndex with a reference-counting
copy-on-write
implementation (that only makes a copy if there's an
active
iterator),
but
I'm open to suggestions.

Regards,
Nick

On Mon, 28 Nov 2022 at 16:36, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Colt,

I didn't do any profiling, but the 844
implementation:

    - Writes uncommitted records to a temporary
RocksDB
instance
       - Since tombstones need to be flagged, all
record
values are
       prefixed with a value/tombstone marker. This
necessitates a
memory
copy.
    - On-commit, iterates all records in this
temporary
instance and
    writes them to the main RocksDB store.
    - While iterating, the value/tombstone marker
needs
to
be
parsed
and
    the real value extracted. This necessitates
another
memory
copy.

My guess is that the cost of iterating the temporary
RocksDB
store
is
the
major factor, with the 2 extra memory copies
per-Record
contributing
a
significant amount too.

Regards,
Nick

On Mon, 28 Nov 2022 at 16:12, Colt McNealy <
c...@littlehorse.io>
wrote:

Hi all,

Out of curiosity, why does the performance of the
store
degrade so
significantly with the 844 implementation? I
wouldn't
be
too
surprised
by
a
50-60% drop (caused by each record being written
twice),
but
96%
is
extreme.

The only thing I can think of which could create
such a
bottleneck
would
be
that perhaps the 844 implementation deserializes and
then
re-serializes
the
store values when copying from the uncommitted to
committed
store,
but I
wasn't able to figure that out when I scanned the
PR.

Colt McNealy
*Founder, LittleHorse.io*


On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi everyone,

I've updated the KIP to resolve all the points
that
have
been
raised
so
far, with one exception: the ALOS default commit
interval
of 5
minutes
is
likely to cause WriteBatchWithIndex memory to grow
too
large.

There's a couple of different things I can think
of
to
solve
this:

    - We already have a memory/record limit in the
KIP
to
prevent
OOM
    errors. Should we choose a default value for
these?
My
concern
here
is
that
    anything we choose might seem rather
arbitrary. We
could
change
    its behaviour such that under ALOS, it only
triggers
the
commit
of
the
    StateStore, but under EOS, it triggers a
commit of
the
Kafka
transaction.
    - We could introduce a separate `
checkpoint.interval.ms`
to
allow
ALOS
    to commit the StateStores more frequently than
the
general
    commit.interval.ms? My concern here is that
the
semantics of
this
config
    would depend on the processing.mode; under
ALOS it
would
allow
more
    frequently committing stores, whereas under
EOS it
couldn't.

Any better ideas?

On Wed, 23 Nov 2022 at 16:25, Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Alex,

Thanks for the feedback.

I've updated the discussion of OOM issues by
describing
how
we'll
handle
it. Here's the new text:

To mitigate this, we will automatically force a
Task
commit if
the
total
uncommitted records returned by
StateStore#approximateNumUncommittedEntries()
exceeds a
threshold,
configured by
max.uncommitted.state.entries.per.task;
or the
total
memory used for buffering uncommitted records
returned
by
StateStore#approximateNumUncommittedBytes()
exceeds
the
threshold
configured by
max.uncommitted.state.bytes.per.task.
This will
roughly
bound the memory required per-Task for
buffering
uncommitted
records,
irrespective of the commit.interval.ms, and
will
effectively
bound
the
number of records that will need to be
restored in
the
event
of a
failure.



These limits will be checked in
StreamTask#process
and
a
premature
commit
will be requested via Task#requestCommit().



Note that these new methods provide default
implementations
that
ensure
existing custom stores and non-transactional
stores
(e.g.
InMemoryKeyValueStore) do not force any early
commits.


I've chosen to have the StateStore expose
approximations
of
its
buffer
size/count instead of opaquely requesting a
commit
in
order to
delegate
the
decision making to the Task itself. This enables
Tasks
to look
at
*all*
of
their StateStores, and determine whether an
early
commit
is
necessary.
Notably, it enables pre-Task thresholds,
instead of
per-Store,
which
prevents Tasks with many StateStores from using
much
more
memory
than
Tasks
with one StateStore. This makes sense, since
commits
are
done
by-Task,
not
by-Store.

Prizes* for anyone who can come up with a better
name
for the
new
config
properties!

Thanks for pointing out the potential
performance
issues
of
WBWI.
From
the
benchmarks that user posted[1], it looks like
WBWI
still
performs
considerably better than individual puts, which
is
the
existing
design,
so
I'd actually expect a performance boost from
WBWI,
just
not as
great
as
we'd get from a plain WriteBatch. This does
suggest
that
a
good
optimization would be to use a regular
WriteBatch
for
restoration
(in
RocksDBStore#restoreBatch), since we know that
those
records
will
never
be
queried before they're committed.

1:





https://github.com/adamretter/rocksjava-write-methods-benchmark#results

* Just kidding, no prizes, sadly.

On Wed, 23 Nov 2022 at 12:28, Alexander
Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Nick,

Thank you for the KIP! With such a significant
performance
degradation
in
the secondary store approach, we should
definitely
consider
WriteBatchWithIndex. I also like encapsulating
checkpointing
inside
the
default state store implementation to improve
performance.

+1 to John's comment to keep the current
checkpointing
as a
fallback
mechanism. We want to keep existing users'
workflows
intact
if
we
can. A
non-intrusive way would be to add a separate
StateStore
method,
say,
StateStore#managesCheckpointing(), that
controls
whether the
state
store
implementation owns checkpointing.

I think that a solution to the transactional
writes
should
address
the
OOMEs. One possible way to address that is to
wire
StateStore's
commit
request by adding, say, StateStore#commitNeeded
that
is
checked
in
StreamTask#commitNeeded via the corresponding
ProcessorStateManager.
With
that change, RocksDBStore will have to track
the
current
transaction
size
and request a commit when the size goes over a
(configurable)
threshold.

AFAIU WriteBatchWithIndex might perform
significantly
slower
than
non-txn
puts as the batch size grows [1]. We should
have a
configuration
to
fall
back to the current behavior (and/or disable
txn
stores
for
ALOS)
unless
the benchmarks show negligible overhead for
longer
commits /
large-enough
batch sizes.

If you prefer to keep the KIP smaller, I would
rather
cut out
state-store-managed checkpointing rather than
proper
OOMe
handling
and
being able to switch to non-txn behavior. The
checkpointing
is
not
necessary to solve the recovery-under-EOS
problem.
On
the
other
hand,
once
WriteBatchWithIndex is in, it will be much
easier
to
add
state-store-managed checkpointing.

If you share the current implementation, I am
happy
to
help
you
address
the
OOMe and configuration parts as well as review
and
test
the
patch.

Best,
Alex


1.
https://github.com/facebook/rocksdb/issues/608

On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
nick.telf...@gmail.com

wrote:

Hi John,

Thanks for the review and feedback!

1. Custom Stores: I've been mulling over this
problem
myself.
As
it
stands,
custom stores would essentially lose
checkpointing
with no
indication
that
they're expected to make changes, besides a
line
in
the
release
notes. I
agree that the best solution would be to
provide a
default
that
checkpoints
to a file. The one thing I would change is
that
the
checkpointing
is
to
a
store-local file, instead of a per-Task file.
This
way the
StateStore
still
technically owns its own checkpointing (via a
default
implementation),
and
the StateManager/Task execution engine
doesn't
need
to know
anything
about
checkpointing, which greatly simplifies some
of
the
logic.

2. OOME errors: The main reasons why I didn't
explore
a
solution
to
this is
a) to keep this KIP as simple as possible,
and
b)
because
I'm
not
exactly
how to signal that a Task should commit
prematurely.
I'm
confident
it's
possible, and I think it's worth adding a
section
on
handling
this.
Besides
my proposal to force an early commit once
memory
usage
reaches
a
threshold,
is there any other approach that you might
suggest
for
tackling
this
problem?

3. ALOS: I can add in an explicit paragraph,
but
my
assumption
is
that
since transactional behaviour comes at
little/no
cost, that
it
should
be
available by default on all stores,
irrespective
of
the
processing
mode.
While ALOS doesn't use transactions, the Task
itself
still
"commits",
so
the behaviour should be correct under ALOS
too.
I'm
not
convinced
that
it's
worth having both
transactional/non-transactional
stores
available, as
it
would considerably increase the complexity of
the
codebase,
for
very
little
benefit.

4. Method deprecation: Are you referring to
StateStore#getPosition()?
As I
understand it, Position contains the
position of
the
*source*
topics,
whereas the commit offsets would be the
*changelog*
offsets.
So
it's
still
necessary to retain the Position data, as
well
as
the
changelog
offsets.
What I meant in the KIP is that Position
offsets
are
currently
stored
in a
file, and since we can atomically store
metadata
along with
the
record
batch we commit to RocksDB, we can move our
Position
offsets
in
to
this
metadata too, and gain the same transactional
guarantees
that
we
will
for
changelog offsets, ensuring that the Position
offsets
are
consistent
with
the records that are read from the database.

Regards,
Nick

On Tue, 22 Nov 2022 at 16:25, John Roesler <
vvcep...@apache.org>
wrote:

Thanks for publishing this alternative,
Nick!

The benchmark you mentioned in the KIP-844
discussion
seems
like
a
compelling reason to revisit the built-in
transactionality
mechanism.
I
also appreciate you analysis, showing that
for
most
use
cases,
the
write
batch approach should be just fine.

There are a couple of points that would
hold
me
back from
approving
this
KIP right now:

1. Loss of coverage for custom stores.
The fact that you can plug in a
(relatively)
simple
implementation
of
the
XStateStore interfaces and automagically
get a
distributed
database
out
of
it is a significant benefit of Kafka
Streams.
I'd
hate to
lose
it,
so
it
would be better to spend some time and
come up
with
a way
to
preserve
that
property. For example, can we provide a
default
implementation
of
`commit(..)` that re-implements the
existing
checkpoint-file
approach? Or
perhaps add an `isTransactional()` flag to
the
state
store
interface
so
that the runtime can decide whether to
continue
to
manage
checkpoint
files
vs delegating transactionality to the
stores?

2. Guarding against OOME
I appreciate your analysis, but I don't
think
it's
sufficient
to
say
that
we will solve the memory problem later if
it
becomes
necessary.
The
experience leading to that situation would
be
quite
bad:
Imagine,
you
upgrade to AK 3.next, your tests pass, so
you
deploy to
production.
That
night, you get paged because your app is
now
crashing
with
OOMEs. As
with
all OOMEs, you'll have a really hard time
finding
the
root
cause,
and
once
you do, you won't have a clear path to
resolve
the
issue.
You
could
only
tune down the commit interval and cache
buffer
size
until
you
stop
getting
crashes.

FYI, I know of multiple cases where people
run
EOS
with
much
larger
commit
intervals to get better batching than the
default,
so I
don't
think
this
pathological case would be as rare as you
suspect.

Given that we already have the rudiments
of an
idea
of
what
we
could
do
to
prevent this downside, we should take the
time
to
design
a
solution.
We
owe
it to our users to ensure that awesome new
features
don't
come
with
bitter
pills unless we can't avoid it.

3. ALOS mode.
On the other hand, I didn't see an
indication
of
how
stores
will
be
handled under ALOS (aka non-EOS) mode.
Theoretically, the
transactionality
of the store and the processing mode are
orthogonal. A
transactional
store
would serve ALOS just as well as a
non-transactional one
(if
not
better).
Under ALOS, though, the default commit
interval
is
five
minutes,
so
the
memory issue is far more pressing.

As I see it, we have several options to
resolve
this
point.
We
could
demonstrate that transactional stores work
just
fine for
ALOS
and we
can
therefore just swap over unconditionally.
We
could
also
disable
the
transactional mechanism under ALOS so that
stores
operate
just
the
same
as
they do today when run in ALOS mode.
Finally,
we
could do
the
same
as
in
KIP-844 and make transactional stores
opt-in
(it'd
be
better
to
avoid
the
extra opt-in mechanism, but it's a good
get-out-of-jail-free
card).

4. (minor point) Deprecation of methods

You mentioned that the new `commit` method
replaces
flush,
updateChangelogOffsets, and checkpoint. It
seems
to
me
that
the
point
about
atomicity and Position also suggests that
it
replaces the
Position
callbacks. However, the proposal only
deprecates
`flush`.
Should
we
be
deprecating other methods as well?

Thanks again for the KIP! It's really nice
that
you
and
Alex
will
get
the
chance to collaborate on both directions so
that
we
can
get
the
best
outcome for Streams and its users.

-John


On 2022/11/21 15:02:15 Nick Telford wrote:
Hi everyone,

As I mentioned in the discussion thread
for
KIP-844,
I've
been
working
on
an alternative approach to achieving
better
transactional
semantics
for
Kafka Streams StateStores.

I've published this separately as
KIP-892:
Transactional
Semantics
for
StateStores
<













https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
,
so that it can be discussed/reviewed
separately
from
KIP-844.

Alex: I'm especially interested in what
you
think!

I have a nearly complete implementation
of
the
changes
outlined in
this
KIP, please let me know if you'd like me
to
push
them
for
review
in
advance
of a vote.

Regards,

Nick




















Reply via email to