Hey Nick,

Thank you for the KIP! With such a significant performance degradation in
the secondary store approach, we should definitely consider
WriteBatchWithIndex. I also like encapsulating checkpointing inside the
default state store implementation to improve performance.

+1 to John's comment to keep the current checkpointing as a fallback
mechanism. We want to keep existing users' workflows intact if we can. A
non-intrusive way would be to add a separate StateStore method, say,
StateStore#managesCheckpointing(), that controls whether the state store
implementation owns checkpointing.

I think that a solution to the transactional writes should address the
OOMEs. One possible way to address that is to wire StateStore's commit
request by adding, say, StateStore#commitNeeded that is checked in
StreamTask#commitNeeded via the corresponding ProcessorStateManager. With
that change, RocksDBStore will have to track the current transaction size
and request a commit when the size goes over a (configurable) threshold.

AFAIU WriteBatchWithIndex might perform significantly slower than non-txn
puts as the batch size grows [1]. We should have a configuration to fall
back to the current behavior (and/or disable txn stores for ALOS) unless
the benchmarks show negligible overhead for longer commits / large-enough
batch sizes.

If you prefer to keep the KIP smaller, I would rather cut out
state-store-managed checkpointing rather than proper OOMe handling and
being able to switch to non-txn behavior. The checkpointing is not
necessary to solve the recovery-under-EOS problem. On the other hand, once
WriteBatchWithIndex is in, it will be much easier to add
state-store-managed checkpointing.

If you share the current implementation, I am happy to help you address the
OOMe and configuration parts as well as review and test the patch.

Best,
Alex


1. https://github.com/facebook/rocksdb/issues/608

On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi John,
>
> Thanks for the review and feedback!
>
> 1. Custom Stores: I've been mulling over this problem myself. As it stands,
> custom stores would essentially lose checkpointing with no indication that
> they're expected to make changes, besides a line in the release notes. I
> agree that the best solution would be to provide a default that checkpoints
> to a file. The one thing I would change is that the checkpointing is to a
> store-local file, instead of a per-Task file. This way the StateStore still
> technically owns its own checkpointing (via a default implementation), and
> the StateManager/Task execution engine doesn't need to know anything about
> checkpointing, which greatly simplifies some of the logic.
>
> 2. OOME errors: The main reasons why I didn't explore a solution to this is
> a) to keep this KIP as simple as possible, and b) because I'm not exactly
> how to signal that a Task should commit prematurely. I'm confident it's
> possible, and I think it's worth adding a section on handling this. Besides
> my proposal to force an early commit once memory usage reaches a threshold,
> is there any other approach that you might suggest for tackling this
> problem?
>
> 3. ALOS: I can add in an explicit paragraph, but my assumption is that
> since transactional behaviour comes at little/no cost, that it should be
> available by default on all stores, irrespective of the processing mode.
> While ALOS doesn't use transactions, the Task itself still "commits", so
> the behaviour should be correct under ALOS too. I'm not convinced that it's
> worth having both transactional/non-transactional stores available, as it
> would considerably increase the complexity of the codebase, for very little
> benefit.
>
> 4. Method deprecation: Are you referring to StateStore#getPosition()? As I
> understand it, Position contains the position of the *source* topics,
> whereas the commit offsets would be the *changelog* offsets. So it's still
> necessary to retain the Position data, as well as the changelog offsets.
> What I meant in the KIP is that Position offsets are currently stored in a
> file, and since we can atomically store metadata along with the record
> batch we commit to RocksDB, we can move our Position offsets in to this
> metadata too, and gain the same transactional guarantees that we will for
> changelog offsets, ensuring that the Position offsets are consistent with
> the records that are read from the database.
>
> Regards,
> Nick
>
> On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> wrote:
>
> > Thanks for publishing this alternative, Nick!
> >
> > The benchmark you mentioned in the KIP-844 discussion seems like a
> > compelling reason to revisit the built-in transactionality mechanism. I
> > also appreciate you analysis, showing that for most use cases, the write
> > batch approach should be just fine.
> >
> > There are a couple of points that would hold me back from approving this
> > KIP right now:
> >
> > 1. Loss of coverage for custom stores.
> > The fact that you can plug in a (relatively) simple implementation of the
> > XStateStore interfaces and automagically get a distributed database out
> of
> > it is a significant benefit of Kafka Streams. I'd hate to lose it, so it
> > would be better to spend some time and come up with a way to preserve
> that
> > property. For example, can we provide a default implementation of
> > `commit(..)` that re-implements the existing checkpoint-file approach? Or
> > perhaps add an `isTransactional()` flag to the state store interface so
> > that the runtime can decide whether to continue to manage checkpoint
> files
> > vs delegating transactionality to the stores?
> >
> > 2. Guarding against OOME
> > I appreciate your analysis, but I don't think it's sufficient to say that
> > we will solve the memory problem later if it becomes necessary. The
> > experience leading to that situation would be quite bad: Imagine, you
> > upgrade to AK 3.next, your tests pass, so you deploy to production. That
> > night, you get paged because your app is now crashing with OOMEs. As with
> > all OOMEs, you'll have a really hard time finding the root cause, and
> once
> > you do, you won't have a clear path to resolve the issue. You could only
> > tune down the commit interval and cache buffer size until you stop
> getting
> > crashes.
> >
> > FYI, I know of multiple cases where people run EOS with much larger
> commit
> > intervals to get better batching than the default, so I don't think this
> > pathological case would be as rare as you suspect.
> >
> > Given that we already have the rudiments of an idea of what we could do
> to
> > prevent this downside, we should take the time to design a solution. We
> owe
> > it to our users to ensure that awesome new features don't come with
> bitter
> > pills unless we can't avoid it.
> >
> > 3. ALOS mode.
> > On the other hand, I didn't see an indication of how stores will be
> > handled under ALOS (aka non-EOS) mode. Theoretically, the
> transactionality
> > of the store and the processing mode are orthogonal. A transactional
> store
> > would serve ALOS just as well as a non-transactional one (if not better).
> > Under ALOS, though, the default commit interval is five minutes, so the
> > memory issue is far more pressing.
> >
> > As I see it, we have several options to resolve this point. We could
> > demonstrate that transactional stores work just fine for ALOS and we can
> > therefore just swap over unconditionally. We could also disable the
> > transactional mechanism under ALOS so that stores operate just the same
> as
> > they do today when run in ALOS mode. Finally, we could do the same as in
> > KIP-844 and make transactional stores opt-in (it'd be better to avoid the
> > extra opt-in mechanism, but it's a good get-out-of-jail-free card).
> >
> > 4. (minor point) Deprecation of methods
> >
> > You mentioned that the new `commit` method replaces flush,
> > updateChangelogOffsets, and checkpoint. It seems to me that the point
> about
> > atomicity and Position also suggests that it replaces the Position
> > callbacks. However, the proposal only deprecates `flush`. Should we be
> > deprecating other methods as well?
> >
> > Thanks again for the KIP! It's really nice that you and Alex will get the
> > chance to collaborate on both directions so that we can get the best
> > outcome for Streams and its users.
> >
> > -John
> >
> >
> > On 2022/11/21 15:02:15 Nick Telford wrote:
> > > Hi everyone,
> > >
> > > As I mentioned in the discussion thread for KIP-844, I've been working
> on
> > > an alternative approach to achieving better transactional semantics for
> > > Kafka Streams StateStores.
> > >
> > > I've published this separately as KIP-892: Transactional Semantics for
> > > StateStores
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > >,
> > > so that it can be discussed/reviewed separately from KIP-844.
> > >
> > > Alex: I'm especially interested in what you think!
> > >
> > > I have a nearly complete implementation of the changes outlined in this
> > > KIP, please let me know if you'd like me to push them for review in
> > advance
> > > of a vote.
> > >
> > > Regards,
> > >
> > > Nick
> > >
> >
>

Reply via email to