To close the loop on this thread. KIP-892 was accepted and is currently implemented. Thus I'll go a head and mark this KIP a discarded.

Thanks a lot Alex for spending so much time on this very important feature! Without your ground work, we would not have KIP-892 and your contributions are noticed!

-Matthias


On 11/21/22 5:12 AM, Nick Telford wrote:
Hi Alex,

Thanks for getting back to me. I actually have most of a working
implementation already. I'm going to write it up as a new KIP, so that it
can be reviewed independently of KIP-844.

Hopefully, working together we can have it ready sooner.

I'll keep you posted on my progress.

Regards,
Nick

On Mon, 21 Nov 2022 at 11:25, Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Nick,

Thank you for the prototype testing and benchmarking, and sorry for the
late reply!

I agree that it is worth revisiting the WriteBatchWithIndex approach. I
will implement a fork of the current prototype that uses that mechanism to
ensure transactionality and let you know when it is ready for
review/testing in this ML thread.

As for time estimates, I might not have enough time to finish the prototype
in December, so it will probably be ready for review in January.

Best,
Alex

On Fri, Nov 11, 2022 at 4:24 PM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi everyone,

Sorry to dredge this up again. I've had a chance to start doing some
testing with the WIP Pull Request, and it appears as though the secondary
store solution performs rather poorly.

In our testing, we had a non-transactional state store that would restore
(from scratch), at a rate of nearly 1,000,000 records/second. When we
switched it to a transactional store, it restored at a rate of less than
40,000 records/second.

I suspect the key issues here are having to copy the data out of the
temporary store and into the main store on-commit, and to a lesser
extent,
the extra memory copies during writes.

I think it's worth re-visiting the WriteBatchWithIndex solution, as it's
clear from the RocksDB post[1] on the subject that it's the recommended
way
to achieve transactionality.

The only issue you identified with this solution was that uncommitted
writes are required to entirely fit in-memory, and RocksDB recommends
they
don't exceed 3-4MiB. If we do some back-of-the-envelope calculations, I
think we'll find that this will be a non-issue for all but the most
extreme
cases, and for those, I think I have a fairly simple solution.

Firstly, when EOS is enabled, the default commit.interval.ms is set to
100ms, which provides fairly short intervals that uncommitted writes need
to be buffered in-memory. If we assume a worst case of 1024 byte records
(and for most cases, they should be much smaller), then 4MiB would hold
~4096 records, which with 100ms commit intervals is a throughput of
approximately 40,960 records/second. This seems quite reasonable.

For use cases that wouldn't reasonably fit in-memory, my suggestion is
that
we have a mechanism that tracks the number/size of uncommitted records in
stores, and prematurely commits the Task when this size exceeds a
configured threshold.

Thanks for your time, and let me know what you think!
--
Nick

1: https://rocksdb.org/blog/2015/02/27/write-batch-with-index.html

On Thu, 6 Oct 2022 at 19:31, Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Nick,

It is going to be option c. Existing state is considered to be
committed
and there will be an additional RocksDB for uncommitted writes.

I am out of office until October 24. I will update KIP and make sure
that
we have an upgrade test for that after coming back from vacation.

Best,
Alex

On Thu, Oct 6, 2022 at 5:06 PM Nick Telford <nick.telf...@gmail.com>
wrote:

Hi everyone,

I realise this has already been voted on and accepted, but it
occurred
to
me today that the KIP doesn't define the migration/upgrade path for
existing non-transactional StateStores that *become* transactional,
i.e.
by
adding the transactional boolean to the StateStore constructor.

What would be the result, when such a change is made to a Topology,
without
explicitly wiping the application state?
a) An error.
b) Local state is wiped.
c) Existing RocksDB database is used as committed writes and new
RocksDB
database is created for uncommitted writes.
d) Something else?

Regards,

Nick

On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Guozhang,

Sounds good. I annotated all added StateStore methods (commit,
recover,
transactional) with @Evolving.

Best,
Alex



On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Alex,

Thanks for the detailed replies, I think that makes sense, and in
the
long
run we would need some public indicators from StateStore to
determine
if
checkpoints can really be used to indicate clean snapshots.

As for the @Evolving label, I think we can still keep it but for
a
different reason, since as we add more state management
functionalities
in
the near future we may need to revisit the public APIs again and
hence
keeping it as @Evolving would allow us to modify if necessary, in
an
easier
path than deprecate -> delete after several minor releases.

Besides that, I have no further comments about the KIP.


Guozhang

On Fri, Aug 26, 2022 at 1:51 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Guozhang,


I think that we will have to keep StateStore#transactional()
because
post-commit checkpointing of non-txn state stores will break
the
guarantees
we want in
ProcessorStateManager#initializeStoreOffsetsFromCheckpoint
for
correct recovery. Let's consider checkpoint-recovery behavior
under
EOS
that we want to support:

1. Non-txn state stores should checkpoint on graceful shutdown
and
restore
from that checkpoint.

2. Non-txn state stores should delete local data during
recovery
after
a
crash failure.

3. Txn state stores should checkpoint on commit and on graceful
shutdown.
These stores should roll back uncommitted changes instead of
deleting
all
local data.


#1 and #2 are already supported; this proposal adds #3.
Essentially,
we
have two parties at play here - the post-commit checkpointing
in
StreamTask#postCommit and recovery in ProcessorStateManager#
initializeStoreOffsetsFromCheckpoint. Together, these methods
must
allow
all three workflows and prevent invalid behavior, e.g., non-txn
stores
should not checkpoint post-commit to avoid keeping uncommitted
data
on
recovery.


In the current state of the prototype, we checkpoint only txn
state
stores
post-commit under EOS using StateStore#transactional(). If we
remove
StateStore#transactional() and always checkpoint post-commit,
ProcessorStateManager#initializeStoreOffsetsFromCheckpoint will
have
to
determine whether to delete local data. Non-txn implementation
of
StateStore#recover can't detect if it has uncommitted writes.
Since
its
default implementation must always return either true or false,
signaling
whether it is restored into a valid committed-only state. If
StateStore#recover always returns true, we preserve uncommitted
writes
and
violate correctness. Otherwise, ProcessorStateManager#
initializeStoreOffsetsFromCheckpoint would always delete local
data
even
after
a graceful shutdown.


With StateStore#transactional we avoid checkpointing non-txn
state
stores
and prevent that problem during recovery.


Best,

Alex

On Fri, Aug 19, 2022 at 1:05 AM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Alex,

Thanks for the replies!

As long as we allow custom user implementations of that
interface,
we
should
probably either keep that flag to distinguish between
transactional
and
non-transactional implementations or change the contract
behind
the
interface. What do you think?

Regarding this question, I thought that in the long run, we
may
always
write checkpoints regardless of txn v.s. non-txn stores, in
which
case
we
would not need that `StateStore#transactional()`. But for now
in
order
for
backward compatibility edge cases we still need to
distinguish
on
whether
or not to write checkpoints. Maybe I was mis-reading its
purposes?
If
yes,
please let me know.


On Mon, Aug 15, 2022 at 7:56 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Guozhang,

Thank you for elaborating! I like your idea to introduce a
StreamsConfig
specifically for the default store APIs. You mentioned
Materialized,
but
I
think changes in StreamJoined follow the same logic.

I updated the KIP and the prototype according to your
suggestions:
* Add a new StoreType and a StreamsConfig for transactional
RocksDB.
* Decide whether Materialized/StreamJoined are
transactional
based
on
the
configured StoreType.
* Move RocksDBTransactionalMechanism to
org.apache.kafka.streams.state.internals to remove it from
the
proposal
scope.
* Add a flag in new Stores methods to configure a state
store
as
transactional. Transactional state stores use the default
transactional
mechanism.
* The changes above allowed to remove all changes to the
StoreSupplier
interface.

I am not sure about marking StateStore#transactional() as
evolving.
As
long
as we allow custom user implementations of that interface,
we
should
probably either keep that flag to distinguish between
transactional
and
non-transactional implementations or change the contract
behind
the
interface. What do you think?

Best,
Alex

On Thu, Aug 11, 2022 at 1:00 AM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Alex,

Thanks for the replies. Regarding the global config v.s.
per-store
spec,
I
agree with John's early comments to some degrees, but I
think
we
may
well
distinguish a couple scenarios here. In sum we are
discussing
about
the
following levels of per-store spec:

* Materialized#transactional()
* StoreSupplier#transactional()
* StateStore#transactional()
* Stores.persistentTransactionalKeyValueStore()...

And my thoughts are the following:

* In the current proposal users could specify
transactional
as
either
"Materialized.as("storeName").withTransantionsEnabled()"
or

"Materialized.as(Stores.persistentTransactionalKeyValueStore(..))",
which
seems not necessary to me. In general, the more options
the
library
provides, the messier for users to learn the new APIs.

* When using built-in stores, users would usually go with
Materialized.as("storeName"). In such cases I feel it's
not
very
meaningful
to specify "some of the built-in stores to be
transactional,
while
others
be non transactional": as long as one of your stores are
non-transactional,
you'd still pay for large restoration cost upon unclean
failure.
People
may, indeed, want to specify if different transactional
mechanisms
to
be
used across stores; but for whether or not the stores
should
be
transactional, I feel it's really an "all or none"
answer,
and
our
built-in
form (rocksDB) should support transactionality for all
store
types.

* When using customized stores, users would usually go
with
Materialized.as(StoreSupplier). And it's possible if
users
would
choose
some to be transactional while others non-transactional
(e.g.
if
their
customized store only supports transactional for some
store
types,
but
not
others).

* At a per-store level, the library do not really care,
or
need
to
know
whether that store is transactional or not at runtime,
except
for
compatibility reasons today we want to make sure the
written
checkpoint
files do not include those non-transactional stores. But
this
check
would
eventually go away as one day we would always checkpoint
files.

---------------------------

With all of that in mind, my gut feeling is that:

* Materialized#transactional(): we would not need this
knob,
since
for
built-in stores I think just a global config should be
sufficient
(see
below), while for customized store users would need to
specify
that
via
the
StoreSupplier anyways and not through this API. Hence I
think
for
either
case we do not need to expose such a knob on the
Materialized
level.

* Stores.persistentTransactionalKeyValueStore(): I think
we
could
refactor
that function without introducing new constructors in the
Stores
factory,
but just add new overloads to the existing func name e.g.

```
persistentKeyValueStore(final String name, final boolean
transactional)
```

Plus we can augment the storeImplType as introduced in










https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store
as a syntax sugar for users, e.g.

```
public enum StoreImplType {
     ROCKS_DB,
     TXN_ROCKS_DB,
     IN_MEMORY
   }
```

```



stream.groupByKey().count(Materialized.withStoreType(StoreImplType.TXN_
ROCKS_DB));
```

The above provides this global config at the store impl
type
level.

* RocksDBTransactionalMechanism: I agree with Bruno that
we
would
better
not expose this knob to users, but rather keep it purely
as
an
impl
detail
abstracted from the "TXN_ROCKS_DB" type. Over time we
may,
e.g.
use
in-memory stores as the secondary stores with optional
spill-to-disks
when
we hit the memory limit, but all of that optimizations in
the
future
should
be kept away from the users.

* StoreSupplier#transactional() /
StateStore#transactional():
the
first
flag is only used to be passed into the StateStore layer,
for
indicating
if
we should write checkpoints; we could mark it as
@evolving
so
that
we
can
one day remove it without a long deprecation period.


Guozhang








On Wed, Aug 10, 2022 at 8:04 AM Alexander Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hey Guozhang, Bruno,

Thank you for your feedback. I am going to respond to
both
of
you
in
a
single email. I hope it is okay.

@Guozhang,

We could, instead, have a global
config to specify if the built-in stores should be
transactional
or
not.


This was the original approach I took in this proposal.
Earlier
in
this
thread John, Sagar, and Bruno listed a number of issues
with
it.
I
tend
to
agree with them that it is probably better user
experience
to
control
transactionality via Materialized objects.

We could simplify our implementation for `commit`

Agreed! I updated the prototype and removed references
to
the
commit
marker
and rolling forward from the proposal.


@Bruno,

So, I would remove the details about the 2-state-store
implementation
from the KIP or provide it as an example of a
possible
implementation
at
the end of the KIP.

I moved the section about the 2-state-store
implementation
to
the
bottom
of
the proposal and always mention it as a reference
implementation.
Please
let me know if this is okay.

Could you please describe the usage of commit() and
recover()
in
the
commit workflow in the KIP as we did in this thread
but
independently
from the state store implementation?

I described how commit/recover change the workflow in
the
Overview
section.

Best,
Alex

On Wed, Aug 10, 2022 at 10:07 AM Bruno Cadonna <
cado...@apache.org

wrote:

Hi Alex,

Thank a lot for explaining!

Now some aspects are clearer to me.

While I understand now, how the state store can roll
forward, I
have
the
feeling that rolling forward is specific to the
2-state-store
implementation with RocksDB of your PoC. Other state
store
implementations might use a different strategy to
react
to
crashes.
For
example, they might apply an atomic write and
effectively
rollback
if
they crash before committing the state store
transaction. I
think
the
KIP should not contain such implementation details
but
provide
an
interface to accommodate rolling forward and rolling
backward.

So, I would remove the details about the
2-state-store
implementation
from the KIP or provide it as an example of a
possible
implementation
at
the end of the KIP.

Since a state store implementation can roll forward
or
roll
back, I
think it is fine to return the changelog offset from
recover().
With
the
returned changelog offset, Streams knows from where
to
start
state
store
restoration.

Could you please describe the usage of commit() and
recover()
in
the
commit workflow in the KIP as we did in this thread
but
independently
from the state store implementation? That would make
things
clearer.
Additionally, descriptions of failure scenarios would
also
be
helpful.

Best,
Bruno


On 04.08.22 16:39, Alexander Sorokoumov wrote:
Hey Bruno,

Thank you for the suggestions and the clarifying
questions. I
believe
that
they cover the core of this proposal, so it is
crucial
for
us
to
be
on
the
same page.

1. Don't you want to deprecate StateStore#flush().


Good call! I updated both the proposal and the
prototype.

   2. I would shorten
Materialized#withTransactionalityEnabled()
to
Materialized#withTransactionsEnabled().


Turns out, these methods are no longer necessary. I
removed
them
from
the
proposal and the prototype.


3. Could you also describe a bit more in detail
where
the
offsets
passed
into commit() and recover() come from?


The offset passed into StateStore#commit is the
last
offset
committed
to
the changelog topic. The offset passed into
StateStore#recover
is
the
last
checkpointed offset for the given StateStore. Let's
look
at
steps 3
and 4
in the commit workflow. After the
TaskExecutor/TaskManager
commits,
it
calls
StreamTask#postCommit[1] that in turn:
a. updates the changelog offsets via
ProcessorStateManager#updateChangelogOffsets[2].
The
offsets
here
come
from
the RecordCollector[3], which tracks the latest
offsets
the
producer
sent
without exception[4, 5].
b. flushes/commits the state store in
AbstractTask#maybeCheckpoint[6].
This
method essentially calls ProcessorStateManager
methods
-
flush/commit[7]
and checkpoint[8]. ProcessorStateManager#commit
goes
over
all
state
stores
that belong to that task and commits them with the
offset
obtained
in
step
`a`. ProcessorStateManager#checkpoint writes down
those
offsets
for
all
state stores, except for non-transactional ones in
the
case
of
EOS.

During initialization, StreamTask calls
StateManagerUtil#registerStateStores[8] that in
turn
calls

ProcessorStateManager#initializeStoreOffsetsFromCheckpoint[9].
At
the
moment, this method assigns checkpointed offsets to
the
corresponding
state
stores[10]. The prototype also calls
StateStore#recover
with
the
checkpointed offset and assigns the offset returned
by
recover()[11].

4. I do not quite understand how a state store can
roll
forward.
You
mention in the thread the following:


The 2-state-stores commit looks like this [12]:

     1. Flush the temporary state store.
     2. Create a commit marker with a changelog
offset
corresponding
to
the
     state we are committing.
     3. Go over all keys in the temporary store and
write
them
down
to
the
     main one.
     4. Wipe the temporary store.
     5. Delete the commit marker.


Let's consider crash failure scenarios:

     - Crash failure happens between steps 1 and 2.
The
main
state
store
is
     in a consistent state that corresponds to the
previously
checkpointed
     offset. StateStore#recover throws away the
temporary
store
and
proceeds
     from the last checkpointed offset.
     - Crash failure happens between steps 2 and 3.
We
do
not
know
what
keys
     from the temporary store were already written
to
the
main
store,
so
we
     can't roll back. There are two options - either
wipe
the
main
store
or roll
     forward. Since the point of this proposal is to
avoid
situations
where we
     throw away the state and we do not care to what
consistent
state
the
store
     rolls to, we roll forward by continuing from
step
3.
     - Crash failure happens between steps 3 and 4.
We
can't
distinguish
     between this and the previous scenario, so we
write
all
the
keys
from the
     temporary store. This is okay because the
operation
is
idempotent.
     - Crash failure happens between steps 4 and 5.
Again,
we
can't
     distinguish between this and previous
scenarios,
but
the
temporary
store is
     already empty. Even though we write all keys
from
the
temporary
store, this
     operation is, in fact, no-op.
     - Crash failure happens between step 5 and
checkpoint.
This
is
the
case
     you referred to in question 5. The commit is
finished,
but
it
is
not
     reflected at the checkpoint. recover() returns
the
offset
of
the
previous
     commit here, which is incorrect, but it is okay
because
we
will
replay the
     changelog from the previously committed offset.
As
changelog
replay
is
     idempotent, the state store recovers into a
consistent
state.

The last crash failure scenario is a natural
transition
to

how should Streams know what to write into the
checkpoint
file
after the crash?


As mentioned above, the Streams app writes the
checkpoint
file
after
the
Kafka transaction and then the StateStore commit.
Same
as
without
the
proposal, it should write the committed offset, as
it
is
the
same
for
both
the Kafka changelog and the state store.


This issue arises because we store the offset
outside
of
the
state
store. Maybe we need an additional method on the
state
store
interface
that returns the offset at which the state store
is.


In my opinion, we should include in the interface
only
the
guarantees
that
are necessary to preserve EOS without wiping the
local
state.
This
way,
we
allow more room for possible implementations.
Thanks
to
the
idempotency
of
the changelog replay, it is "good enough" if
StateStore#recover
returns
the
offset that is less than what it actually is. The
only
limitation
here
is
that the state store should never commit writes
that
are
not
yet
committed
in Kafka changelog.

Please let me know what you think about this. First
of
all, I
am
relatively
new to the codebase, so I might be wrong in my
understanding
of
how it works. Second, while writing this, it
occured
to
me
that
the
StateStore#recover interface method is not
straightforward
as
it
can
be.
Maybe we can change it like that:

/**
      * Recover a transactional state store
      * <p>
      * If a transactional state store shut down
with
a
crash
failure,
this
method ensures that the
      * state store is in a consistent state that
corresponds
to
{@code
changelofOffset} or later.
      *
      * @param changelogOffset the checkpointed
changelog
offset.
      * @return {@code true} if recovery succeeded,
{@code
false}
otherwise.
      */
boolean recover(final Long changelogOffset) {

Note: all links below except for [10] lead to the
prototype's
code.
1.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L468
2.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L580
3.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L868
4.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L94-L96
5.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L213-L216
6.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L94-L97
7.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L469
8.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L226
9.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java#L103
10.












https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L251-L252
11.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L250-L265
12.












https://github.com/apache/kafka/blob/549e54be95a8e1bae1e97df2c21d48c042ff356e/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java#L84-L88

Best,
Alex

On Fri, Jul 29, 2022 at 3:42 PM Bruno Cadonna <
cado...@apache.org>
wrote:

Hi Alex,

Thanks for the updates!

1. Don't you want to deprecate StateStore#flush().
As
far
as I
understand, commit() is the new flush(), right? If
you
do
not
deprecate
it, you don't get rid of the error room you
describe
in
your
KIP
by
having a flush() and a commit().


2. I would shorten
Materialized#withTransactionalityEnabled()
to
Materialized#withTransactionsEnabled().


3. Could you also describe a bit more in detail
where
the
offsets
passed
into commit() and recover() come from?


For my next two points, I need the commit workflow
that
you
were
so
kind
to post into this thread:

1. write stuff to the state store
2. producer.sendOffsetsToTransaction(token);
producer.commitTransaction();
3. flush (<- that would be call to commit(),
right?)
4. checkpoint


4. I do not quite understand how a state store can
roll
forward.
You
mention in the thread the following:

"If the crash failure happens during #3, the state
store
can
roll
forward and finish the flush/commit."

How does the state store know where it stopped the
flushing
when
it
crashed?

This seems an optimization to me. I think in
general
the
state
store
should rollback to the last successfully committed
state
and
restore
from there until the end of the changelog topic
partition.
The
last
committed state is the offsets in the checkpoint
file.


5. In the same e-mail from point 4, you also
state:

"If the crash failure happens between #3 and #4,
the
state
store
should
do nothing during recovery and just proceed with
the
checkpoint."

How should Streams know that the failure was
between
#3
and
#4
during
recovery? It just sees a valid state store and a
valid
checkpoint
file.
Streams does not know that the state of the
checkpoint
file
does
not
match with the committed state of the state store.
Also, how should Streams know what to write into
the
checkpoint
file
after the crash?
This issue arises because we store the offset
outside
of
the
state
store. Maybe we need an additional method on the
state
store
interface
that returns the offset at which the state store
is.


Best,
Bruno




On 27.07.22 11:51, Alexander Sorokoumov wrote:
Hey Nick,

Thank you for the kind words and the feedback!
I'll
definitely
add
an
option to configure the transactional mechanism
in
Stores
factory
method
via an argument as John previously suggested and
might
add
the
in-memory
option via RocksDB Indexed Batches if I figure
why
their
creation
via
rocksdb jni fails with
`UnsatisfiedLinkException`.

Best,
Alex

On Wed, Jul 27, 2022 at 11:46 AM Alexander
Sorokoumov <
asorokou...@confluent.io> wrote:

Hey Guozhang,

1) About the param passed into the `recover()`
function:
it
seems
to
me
that the semantics of "recover(offset)" is:
recover
this
state
to a
transaction boundary which is at least the
passed-in
offset.
And
the
only
possibility that the returned offset is
different
than
the
passed-in
offset
is that if the previous failure happens after
we've
done
all
the
commit
procedures except writing the new checkpoint,
in
which
case
the
returned
offset would be larger than the passed-in
offset.
Otherwise
it
should
always be equal to the passed-in offset, is
that
right?


Right now, the only case when `recover` returns
an
offset
different
from
the passed one is when the failure happens
*during*
commit.


If the failure happens after commit but before
the
checkpoint,
`recover`
might return either a passed or newer committed
offset,
depending
on
the
implementation. The `recover` implementation in
the
prototype
returns
a
passed offset because it deletes the commit
marker
that
holds
that
offset
after the commit is done. In that case, the
store
will
replay
the
last
commit from the changelog. I think it is fine as
the
changelog
replay
is
idempotent.

2) It seems the only use for the
"transactional()"
function
is
to
determine
if we can update the checkpoint file while in
EOS.


Right now, there are 2 other uses for
`transactional()`:
1. To determine what to do during initialization
if
the
checkpoint
is
gone
(see [1]). If the state store is transactional,
we
don't
have
to
wipe
the
existing data. Thinking about it now, we do not
really
need
this
check
whether the store is `transactional` because if
it
is
not,
we'd
not
have
written the checkpoint in the first place. I am
going
to
remove
that
check.
2. To determine if the persistent kv store in
KStreamImplJoin
should
be
transactional (see [2], [3]).

I am not sure if we can get rid of the checks in
point
2.
If
so,
I'd
be
happy to encapsulate `transactional()` logic in
`commit/recover`.

Best,
Alex

1.













https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
2.













https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
3.













https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354

On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <
nick.telf...@gmail.com>
wrote:

Hi Alex,

Excellent proposal, I'm very keen to see this
land!

Would it be useful to permit configuring the
type
of
store
used
for
uncommitted offsets on a store-by-store basis?
This
way,
users
could
choose
whether to use, e.g. an in-memory store or
RocksDB,
potentially
reducing
the overheads associated with RocksDb for
smaller
stores,
but
without
the
memory pressure issues?

I suspect that in most cases, the number of
uncommitted
records
will
be
very small, because the default commit interval
is
100ms.

Regards,

Nick

On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Alex,

Thanks for the updated KIP, I looked over it
and
browsed
the
WIP
and
just
have a couple meta thoughts:

1) About the param passed into the `recover()`
function:
it
seems
to
me
that the semantics of "recover(offset)" is:
recover
this
state
to
a
transaction boundary which is at least the
passed-in
offset.
And
the
only
possibility that the returned offset is
different
than
the
passed-in
offset
is that if the previous failure happens after
we've
done
all
the
commit
procedures except writing the new checkpoint,
in
which
case
the
returned
offset would be larger than the passed-in
offset.
Otherwise
it
should
always be equal to the passed-in offset, is
that
right?

2) It seems the only use for the
"transactional()"
function
is
to
determine
if we can update the checkpoint file while in
EOS.
But
the
purpose
of
the
checkpoint file's offsets is just to tell "the
local
state's
current
snapshot's progress is at least the indicated
offsets"
anyways,
and
with
this KIP maybe we would just do:

a) when in ALOS, upon failover: we set the
starting
offset
as
checkpointed-offset, then restore() from
changelog
till
the
end-offset.
This way we may restore some records twice.
b) when in EOS, upon failover: we first call
recover(checkpointed-offset),
then set the starting offset as the returned
offset
(which
may
be
larger
than checkpointed-offset), then restore until
the
end-offset.

So why not also:
c) we let the `commit()` function to also
return
an
offset,
which
indicates
"checkpointable offsets".
d) for existing non-transactional stores, we
just
have a
default
implementation of "commit()" which is simply a
flush,
and
returns
a
sentinel value like -1. Then later if we get
checkpointable
offsets
-1,
we
do not write the checkpoint. Upon clean
shutting
down
we
can
just
checkpoint regardless of the returned value
from
"commit".
e) for existing non-transactional stores, we
just
have a
default
implementation of "recover()" which is to wipe
out
the
local
store
and
return offset 0 if the passed in offset is -1,
otherwise
if
not
-1
then
it
indicates a clean shutdown in the last run,
can
this
function
is
just
a
no-op.

In that case, we would not need the
"transactional()"
function
anymore,
since for non-transactional stores their
behaviors
are
still
wrapped
in
the
`commit / recover` function pairs.

I have not completed the thorough pass on your
WIP
PR,
so
maybe
I
could
come up with some more feedback later, but
just
let
me
know
if
my
understanding above is correct or not?


Guozhang




On Thu, Jul 14, 2022 at 7:01 AM Alexander
Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hi,

I updated the KIP with the following changes:
* Replaced in-memory batches with the
secondary-store
approach
as
the
default implementation to address the
feedback
about
memory
pressure
as
suggested by Sagar and Bruno.
* Introduced StateStore#commit and
StateStore#recover
methods
as
an
extension of the rollback idea. @Guozhang,
please
see
the
comment
below
on
why I took a slightly different approach than
you
suggested.
* Removed mentions of changes to IQv1 and
IQv2.
Transactional
state
stores
enable reading committed in IQ, but it is
really
an
independent
feature
that deserves its own KIP. Conflating them
unnecessarily
increases
the
scope for discussion, implementation, and
testing
in
a
single
unit
of
work.

I also published a prototype -
https://github.com/apache/kafka/pull/12393
that implements changes described in the
proposal.

Regarding explicit rollback, I think it is a
powerful
idea
that
allows
other StateStore implementations to take a
different
path
to
the
transactional behavior rather than keep 2
state
stores.
Instead
of
introducing a new commit token, I suggest
using a
changelog
offset
that
already 1:1 corresponds to the materialized
state.
This
works
nicely
because Kafka Stream first commits an AK
transaction
and
only
then
checkpoints the state store, so we can use
the
changelog
offset
to
commit
the state store transaction.

I called the method StateStore#recover rather
than
StateStore#rollback
because a state store might either roll back
or
forward
depending
on
the
specific point of the crash failure.Consider
the
write
algorithm
in
Kafka
Streams is:
1. write stuff to the state store
2. producer.sendOffsetsToTransaction(token);
producer.commitTransaction();
3. flush
4. checkpoint

Let's consider 3 cases:
1. If the crash failure happens between #2
and
#3,
the
state
store
rolls
back and replays the uncommitted transaction
from
the
changelog.
2. If the crash failure happens during #3,
the
state
store
can
roll
forward
and finish the flush/commit.
3. If the crash failure happens between #3
and
#4,
the
state
store
should
do nothing during recovery and just proceed
with
the
checkpoint.

Looking forward to your feedback,
Alexander

On Wed, Jun 8, 2022 at 12:16 AM Alexander
Sorokoumov
<
asorokou...@confluent.io> wrote:

Hi,

As a status update, I did the following
changes
to
the
KIP:
* replaced configuration via the top-level
config
with
configuration
via
Stores factory and StoreSuppliers,
* added IQv2 and elaborated how
readCommitted
will
work
when
the
store
is
not transactional,
* removed claims about ALOS.

I am going to be OOO in the next couple of
weeks
and
will
resume
working
on the proposal and responding to the
discussion
in
this
thread
starting
June 27. My next top priorities are:
1. Prototype the rollback approach as
suggested
by
Guozhang.
2. Replace in-memory batches with the
secondary-store
approach
as
the
default implementation to address the
feedback
about
memory
pressure as
suggested by Sagar and Bruno.
3. Adjust Stores methods to make
transactional
implementations
pluggable.
4. Publish the POC for the first review.

Best regards,
Alex

On Wed, Jun 1, 2022 at 2:52 PM Guozhang
Wang <
wangg...@gmail.com>
wrote:

Alex,

Thanks for your replies! That is very
helpful.

Just to broaden our discussions a bit
here, I
think
there
are
some
other
approaches in parallel to the idea of
"enforce
to
only
persist
upon
explicit flush" and I'd like to throw one
here
--
not
really
advocating
it,
but just for us to compare the pros and
cons:

1) We let the StateStore's `flush` function
to
return a
token
instead
of
returning `void`.
2) We add another `rollback(token)`
interface
of
StateStore
which
would
effectively rollback the state as indicated
by
the
token
to
the
snapshot
when the corresponding `flush` is called.
3) We encode the token and commit as part
of
`producer#sendOffsetsToTransaction`.

Users could optionally implement the new
functions,
or
they
can
just
not
return the token at all and not implement
the
second
function.
Again,
the
APIs are just for the sake of illustration,
not
feeling
they
are
the
most
natural :)

Then the procedure would be:

1. the previous checkpointed offset is 100
...
3. flush store, make sure all writes are
persisted;
get
the
returned
token
that indicates the snapshot of 200.
4.
producer.sendOffsetsToTransaction(token);
producer.commitTransaction();
5. Update the checkpoint file (say, the new
value
is
200).

Then if there's a failure, say between 3/4,
we
would
get
the
token
from
the
last committed txn, and first we would do
the
restoration
(which
may
get
the state to somewhere between 100 and
200),
then
call
`store.rollback(token)` to rollback to the
snapshot
of
offset
100.

The pros is that we would then not need to
enforce
the
state
stores to
not
persist any data during the txn: for stores
that
may
not
be
able
to
implement the `rollback` function, they can
still
reduce
its
impl
to
"not
persisting any data" via this API, but for
stores
that
can
indeed
support
the rollback, their implementation may be
more
efficient.
The
cons
though,
on top of my head are 1) more complicated
logic
differentiating
between
EOS
with and without store rollback support,
and
ALOS,
2)
encoding
the
token
as
part of the commit offset is not ideal if
it
is
big,
3)
the
recovery
logic
including the state store is also a bit
more
complicated.


Guozhang





On Wed, Jun 1, 2022 at 1:29 PM Alexander
Sorokoumov
<asorokou...@confluent.io.invalid> wrote:

Hi Guozhang,

But I'm still trying to clarify how it
guarantees
EOS,
and
it
seems
that we
would achieve it by enforcing to not
persist
any
data
written
within
this
transaction until step 4. Is that
correct?


This is correct. Both alternatives -
in-memory
WriteBatchWithIndex
and
transactionality via the secondary store
guarantee
EOS
by
not
persisting
data in the "main" state store until it is
committed
in
the
changelog
topic.

Oh what I meant is not what KStream code
does,
but
that
StateStore
impl
classes themselves could potentially
flush
data
to
become
persisted
asynchronously


Thank you for elaborating! You are
correct,
the
underlying
state
store
should not persist data until the streams
app
calls
StateStore#flush.
There
are 2 options how a State Store
implementation
can
guarantee
that -
either
keep uncommitted writes in memory or be
able
to
roll
back
the
changes
that
were not committed during recovery.
RocksDB's
WriteBatchWithIndex is
an
implementation of the first option. A
considered
alternative,
Transactions
via Secondary State Store for Uncommitted
Changes,
is
the
way
to
implement
the second option.

As everyone correctly pointed out, keeping
uncommitted
data
in
memory
introduces a very real risk of OOM that we
will
need
to
handle.
The
more I
think about it, the more I lean towards
going
with
the
Transactions
via
Secondary Store as the way to implement
transactionality
as
it
does
not
have that issue.

Best,
Alex


On Wed, Jun 1, 2022 at 12:59 PM Guozhang
Wang
<
wangg...@gmail.com>
wrote:

Hello Alex,

we flush the cache, but not the
underlying
state
store.

You're right. The ordering I mentioned
above
is
actually:

...
3. producer.sendOffsetsToTransaction();
producer.commitTransaction();
4. flush store, make sure all writes are
persisted.
5. Update the checkpoint file to 200.

But I'm still trying to clarify how it
guarantees
EOS,
and
it
seems
that
we
would achieve it by enforcing to not
persist
any
data
written
within
this
transaction until step 4. Is that
correct?

Can you please point me to the place in
the
codebase
where
we
trigger
async flush before the commit?

Oh what I meant is not what KStream code
does,
but
that
StateStore
impl
classes themselves could potentially
flush
data
to
become
persisted
asynchronously, e.g. RocksDB does that
naturally
out
of
the
control
of
KStream code. I think it is related to my
previous
question:
if we
think
by
guaranteeing EOS at the state store
level,
we
would
effectively
ask
the
impl classes that "you should not persist
any
data
until
`flush`
is
called
explicitly", is the StateStore interface
the
right
level
to
enforce
such
mechanisms, or should we just do that on
top
of
the
StateStores,
e.g.
during the transaction we just keep all
the
writes
in
the
cache
(of
course
we need to consider how to work around
memory
pressure
as
previously
mentioned), and then upon committing, we
just
write
the
cached
records
as a
whole into the store and then call flush.


Guozhang







On Tue, May 31, 2022 at 4:08 PM Alexander
Sorokoumov
<asorokou...@confluent.io.invalid>
wrote:

Hey,

Thank you for the wealth of great
suggestions
and
questions!
I
am
going
to
address the feedback in batches and
update
the
proposal
async,
as
it is
probably going to be easier for
everyone.
I
will
also
write
a
separate
message after making updates to the KIP.

@John,

Did you consider instead just adding
the
option
to
the
RocksDB*StoreSupplier classes and the
factories
in
Stores ?

Thank you for suggesting that. I think
that
this
idea
is
better
than
what I
came up with and will update the KIP
with
configuring
transactionality
via
the suppliers and Stores.

what is the advantage over just doing
the
same
thing
with
the
RecordCache
and not introducing the WriteBatch at
all?

Can you point me to RecordCache? I can't
find
it
in
the
project.
The
advantage would be that WriteBatch
guarantees
write
atomicity.
As
far
as
I
understood the way RecordCache works, it
might
leave
the
system
in
an
inconsistent state during crash failure
on
write.

You mentioned that a transactional store
can
help
reduce
duplication in
the
case of ALOS

I will remove claims about ALOS from the
proposal.
Thank
you
for
elaborating!

As a reminder, we have a new IQv2
mechanism
now.
Should
we
propose
any
changes to IQv1 to support this
transactional
mechanism,
versus
just
proposing it for IQv2? Certainly, it
seems
strange
only
to
propose a
change
for IQv1 and not v2.


    I will update the proposal with
complementary
API
changes
for
IQv2

What should IQ do if I request to
readCommitted
on a
non-transactional
store?

We can assume that non-transactional
stores
commit
on
write,
so
IQ
works
in
the same way with non-transactional
stores
regardless
of
the
value
of
readCommitted.


    @Guozhang,

* If we crash between line 3 and 4, then
at
that
time
the
local
persistent
store image is representing as of
offset
200,
but
upon
recovery
all
changelog records from 100 to
log-end-offset
would
be
considered
as
aborted
and not be replayed and we would
restart
processing
from
position
100.
Restart processing will violate EOS.I'm
not
sure
how
e.g.
RocksDB's
WriteBatchWithIndex would make sure
that
the
step 4
and
step 5
could
be
done atomically here.


Could you please point me to the place
in
the
codebase
where
a
task
flushes
the store before committing the
transaction?
Looking at TaskExecutor (




















https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
),
StreamTask#prepareCommit (




















https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
),
and CachedStateStore (




















https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
)
we flush the cache, but not the
underlying
state
store.
Explicit
StateStore#flush happens in
AbstractTask#maybeWriteCheckpoint (




















https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
).
Is there something I am missing here?

Today all cached data that have not been
flushed
are
not
committed
for
sure, but even flushed data to the
persistent
underlying
store
may
also
be
uncommitted since flushing can be
triggered
asynchronously
before
the
commit.

Can you please point me to the place in
the
codebase
where
we
trigger
async
flush before the commit? This would
certainly
be a
reason
to
introduce
a
dedicated StateStore#commit method.

Thanks again for the feedback. I am
going
to
update
the
KIP
and
then
respond to the next batch of questions
and
suggestions.

Best,
Alex

On Mon, May 30, 2022 at 5:13 PM Suhas
Satish
<ssat...@confluent.io.invalid

wrote:

Thanks for the KIP proposal Alex.
1. Configuration default

You mention applications using streams
DSL
with
built-in
rocksDB
state
store will get transactional state
stores
by
default
when
EOS
is
enabled,
but the default implementation for apps
using
PAPI
will
fallback
to
non-transactional behavior.
Shouldn't we have the same default
behavior
for
both
types
of
apps -
DSL
and PAPI?

On Mon, May 30, 2022 at 2:11 AM Bruno
Cadonna <
cado...@apache.org

wrote:

Thanks for the PR, Alex!

I am also glad to see this coming.


1. Configuration

I would also prefer to restrict the
configuration
of
transactional
on
the state sore. Ideally, calling
method
transactional()
on
the
state
store would be enough. An option on
the
store
builder
would
make it
possible to turn transactionality on
and
off
(as
John
proposed).


2. Memory usage in RocksDB

This seems to be a major issue. We do
not
have
any
guarantee
that
uncommitted writes fit into memory
and I
guess
we
will
never
have.
What
happens when the uncommitted writes do
not
fit
into
memory?
Does
RocksDB
throw an exception? Can we handle such
an
exception
without
crashing?

Does the RocksDB behavior even need to
be
included
in
this
KIP?
In
the
end it is an implementation detail.

What we should consider - though - is
a
memory
limit
in
some
form.
And
what we do when the memory limit is
exceeded.


3. PoC

I agree with Guozhang that a PoC is a
good
idea
to
better
understand
the
devils in the details.


Best,
Bruno

On 25.05.22 01:52, Guozhang Wang
wrote:
Hello Alex,

Thanks for writing the proposal! Glad
to
see
it
coming. I
think
this
is
the
kind of a KIP that since too many
devils
would
be
buried
in
the
details
and
it's better to start working on a
POC,
either
in
parallel,
or
before
we
resume our discussion, rather than
blocking
any
implementation
until
we
are
satisfied with the proposal.

Just as a concrete example, I
personally
am
still
not
100%
clear
how
the
proposal would work to achieve EOS
with
the
state
stores.
For
example,
the
commit procedure today looks like
this:

0: there's an existing checkpoint
file
indicating
the
changelog
offset
of
the local state store image is 100.
Now a
commit
is
triggered:
1. flush cache (since it contains
partially
processed
records),
make
sure
all records are written to the
producer.
2. flush producer, making sure all
changelog
records
have
now
acked.
//
here we would get the new changelog
position,
say
200
3. flush store, make sure all writes
are
persisted.
4.
producer.sendOffsetsToTransaction();
producer.commitTransaction();
//
we
would make the writes in changelog up
to
offset
200
committed
5. Update the checkpoint file to 200.

The question about atomicity between
those
lines,
for
example:

* If we crash between line 4 and line
5,
the
local
checkpoint
file
would
stay as 100, and upon recovery we
would
replay
the
changelog
from
100
to
200. This is not ideal but does not
violate
EOS,
since
the
changelogs
are
all overwrites anyways.
* If we crash between line 3 and 4,
then
at
that
time
the
local
persistent
store image is representing as of
offset
200,
but
upon
recovery
all
changelog records from 100 to
log-end-offset
would
be
considered
as
aborted
and not be replayed and we would
restart
processing
from
position
100.
Restart processing will violate
EOS.I'm
not
sure
how
e.g.
RocksDB's
WriteBatchWithIndex would make sure
that
the
step 4
and
step 5
could
be
done atomically here.

Originally what I was thinking when
creating
the
JIRA
ticket
is
that
we
need to let the state store to
provide
a
transactional
API
like
"token
commit()" used in step 4) above which
returns a
token,
that
e.g.
in
our
example above indicates offset 200,
and
that
token
would
be
written
as
part
of the records in Kafka transaction
in
step
5).
And
upon
recovery
the
state
store would have another API like
"rollback(token)"
where
the
token
is
read
from the latest committed txn, and be
used
to
rollback
the
store
to
that
committed image. I think your
proposal
is
different,
and
it
seems
like
you're proposing we swap step 3) and
4)
above,
but
the
atomicity
issue
still remains since now you may have
the
store
image
at
100
but
the
changelog is committed at 200. I'd
like
to
learn
more
about
the
details
on how it resolves such issues.

Anyways, that's just an example to
make
the
point
that
there
are
lots
of
implementational details which would
drive
the
public
API
design,
and
we
should probably first do a POC, and
come
back
to
discuss
the
KIP.
Let
me
know what you think?


Guozhang









On Tue, May 24, 2022 at 10:35 AM
Sagar
<
sagarmeansoc...@gmail.com>
wrote:

Hi Alexander,

Thanks for the KIP! This seems like
a
great
proposal.
I
have
the
same
opinion as John on the Configuration
part
though.
I
think
the 2
level
config and its behaviour based on
the
setting/unsetting
of
the
flag
seems
confusing to me as well. Since the
KIP
seems
specifically
centred
around
RocksDB it might be better to add it
at
the
Supplier
level
as
John
suggested.

On similar lines, this config name
=>
*statestore.transactional.mechanism
*may
also need rethinking as the value
assigned
to
it(rocksdb_indexbatch)
implicitly seems to assume that
rocksdb
is
the
only
statestore
that
Kafka
Stream supports while that's not the
case.

Also, regarding the potential memory
pressure
that
can be
introduced
by
WriteBatchIndex, do you think it
might
make
more
sense to
include
some
numbers/benchmarks on how much the
memory
consumption
might
increase?

Lastly, the read_uncommitted flag's
behaviour
on
IQ
may
need
more
elaboration.

These points aside, as I said, this
is a
great
proposal!

Thanks!
Sagar.

On Tue, May 24, 2022 at 10:35 PM
John
Roesler
<
vvcep...@apache.org>
wrote:

Thanks for the KIP, Alex!

I'm really happy to see your
proposal.
This
improvement
fills a
long-standing gap.

I have a few questions:

1. Configuration
The KIP only mentions RocksDB, but
of
course,
Streams
also
ships
with
an
InMemory store, and users also plug
in
their
own
custom
state
stores.
It
is
also common to use multiple types
of
state
stores
in
the
same
application
for different purposes.

Against this backdrop, the choice
to
configure
transactionality
as
a
top-level config, as well as to
configure
the
store
transaction
mechanism
as a top-level config, seems a bit
off.

Did you consider instead just
adding
the
option
to
the
RocksDB*StoreSupplier classes and
the
factories
in
Stores
?
It
seems
like
the desire to enable the feature by
default,
but
with a
feature-flag
to
disable it was a factor here.
However,
as
you
pointed
out,
there
are
some
major considerations that users
should
be
aware
of,
so
opt-in
doesn't
seem
like a bad choice, either. You
could
add
an
Enum
argument
to
those
factories like
`RocksDBTransactionalMechanism.{NONE,

Some points in favor of this
approach:
* Avoid "stores that don't support
transactions
ignore
the
config"
complexity
* Users can choose how to spend
their
memory
budget,
making
some
stores
transactional and others not
* When we add transactional support
to
in-memory
stores,
we
don't
have
to
figure out what to do with the
mechanism
config
(i.e.,
what
do
you
set
the
mechanism to when there are
multiple
kinds
of
transactional
stores
in
the
topology?)

2. caching/flushing/transactions
The coupling between memory usage
and
flushing
that
you
mentioned
is
a
bit
troubling. It also occurs to me
that
there
seems
to
be
some
relationship
with the existing record cache,
which
is
also
an
in-memory
holding
area
for
records that are not yet written to
the
cache
and/or
store
(albeit
with
no
particular semantics). Have you
considered
how
all
these
components
should
relate? For example, should a
"full"
WriteBatch
actually
trigger
a
flush
so
that we don't get OOMEs? If the
proposed
transactional
mechanism
forces
all
uncommitted writes to be buffered
in
memory,
until
a
commit,
then
what
is
the advantage over just doing the
same
thing
with
the
RecordCache
and
not
introducing the WriteBatch at all?

3. ALOS
You mentioned that a transactional
store
can
help
reduce
duplication
in
the case of ALOS. We might want to
be
careful
about
claims
like
that.
Duplication isn't the way that
repeated
processing
manifests in
state
stores. Rather, it is in the form
of
dirty
reads
during
reprocessing.
This
feature may reduce the incidence of
dirty
reads
during
reprocessing,
but
not in a predictable way. During
regular
processing
today,
we
will
send
some records through to the
changelog
in
between
commit
intervals.
Under
ALOS, if any of those dirty writes
gets
committed
to
the
changelog
topic,
then upon failure, we have to roll
the
store
forward
to
them
anyway,
regardless of this new
transactional
mechanism.
That's a
fixable
problem,
by the way, but this KIP doesn't
seem
to
fix
it.
I
wonder
if we
should
make
any claims about the relationship
of
this
feature
to
ALOS
if
the
real-world
behavior is so complex.

4. IQ
As a reminder, we have a new IQv2
mechanism
now.
Should
we
propose
any
changes to IQv1 to support this
transactional
mechanism,
versus
just
proposing it for IQv2? Certainly,
it
seems
strange
only
to
propose
a
change
for IQv1 and not v2.

Regarding your proposal for IQv1,
I'm
unsure
what
the
behavior
should
be
for readCommitted, since the
current
behavior
also
reads
out of
the
RecordCache. I guess if
readCommitted==false,
then
we
will
continue
to
read
from the cache first, then the
Batch,
then
the
store;
and
if
readCommitted==true, we would skip
the
cache
and
the
Batch
and
only
read
from the persistent RocksDB store?

What should IQ do if I request to
readCommitted
on
a
non-transactional
store?

Thanks again for proposing the KIP,
and
my
apologies
for
the
long
reply;
I'm hoping to air all my concerns
in
one
"batch"
to
save
time
for
you.

Thanks,
-John

On Tue, May 24, 2022, at 03:45,
Alexander
Sorokoumov
wrote:
Hi all,

I've written a KIP for making
Kafka
Streams
state
stores
transactional
and
would like to start a discussion:

























https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores

Best,
Alex







--

[image: Confluent] <
https://www.confluent.io

Suhas Satish
Engineering Manager
Follow us: [image: Blog]
<




















https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
[image:
Twitter] <
https://twitter.com/ConfluentInc
[image:
LinkedIn]
<
https://www.linkedin.com/company/confluent/


[image: Try Confluent Cloud for Free]
<




















https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic





--
-- Guozhang




--
-- Guozhang





--
-- Guozhang










--
-- Guozhang




--
-- Guozhang




--
-- Guozhang







Reply via email to