Thanks, Matthias and Sagar, for your comments! I've responded here for now,
and will update the KIP afterwards with the outcome of our discussions as
they resolve.

----------- Matthias's comments -----------

> (1) Why does the new store not extend KeyValueStore, but StateStore?
In the end, it's a KeyValueStore?

A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>` because
many of the KeyValueStore methods would not make sense for a versioned
store. For example, `put(K key, V value)` is not meaningful for a versioned
store because the record needs a timestamp associated with it.

A `VersionedKeyValueStore<K, V>` is more similar to a `KeyValueStore<K,
ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K, V>`), but some
of the TimestampedKeyValueStore methods are still problematic. For example,
what does it mean for `delete(K key)` to have return type
`ValueAndTimestamp<V>`? Does this mean that `delete(K key)` only deletes
(and returns) the latest record version for the key? Probably we want a
versioned store to have `delete(K key)` delete all record versions for the
given key, in which case the return type is better suited as an
iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
ValueAndTimestamp value)` also has ambiguous semantics for versioned stores
(i.e., what does it mean for the key/record to be "absent").

I agree that conceptually a versioned key-value store is just a key-value
store, though. In the future if we redesign the store interfaces, it'd be
great to unify them by having a more generic KeyValueStore interface that
allows for extra flexibility to support different types of key-value
stores, including versioned stores. (Or, if you can think of a way to
achieve this with the existing interfaces today, I'm all ears!)

> (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
want to support IQ in this KIP, it might be good to add this interface
right away to avoid complications for follow up KIPs? Or won't there by
any complications anyway?

I don't think there will be complications for refactoring to add this
interface in the future. Refactoring out ReadOnlyVersionedKeyValueStore
from VersionedKeyValueStore would leave VersionedKeyValueStore unchanged
from the outside.

Also, is it true that the ReadOnlyKeyValueStore interface is only used for
IQv1 and not IQv2? I think it's an open question as to whether we should
support IQv1 for versioned stores or only IQv2. If the latter, then maybe
we won't need the extra interface at all.

> (3) Why do we not have a `delete(key)` method? I am ok with not
supporting all methods from existing KV-store, but a `delete(key)` seems
to be fundamentally to have?

What do you think the semantics of `delete(key)` should be for versioned
stores? Should `delete(key)` delete (and return) all record versions for
the key? Or should we have `delete(key, timestamp)` which is equivalent to
`put(key, null, timestamp)` except with a return type to return
ValueAndTimestamp representing the record it replaced?

If we have ready alignment on what the interface and semantics for
`delete(key)` should be, then adding it in this KIP sounds good. I just
didn't want the rest of the KIP to be hung up over additional interfaces,
given that we can always add extra interfaces in the future.

> (4a) Do we need `get(key)`? It seems to be the same as `get(key,
MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
my own clarification (should we add something to the JavaDocs?).

Correct, it is just syntactic sugar. I will add a clarification into the
Javadocs as you've suggested.

> (4b) Should we throw an exception if a user queries out-of-bound
instead of returning `null` (in `get(key,ts)`)?
   -> You put it into "rejected alternatives", and I understand your
argument. Would love to get input from others about this question
though. -- It seems we also return `null` for windowed stores, so maybe
the strongest argument is to align to existing behavior? Or do we have
case for which the current behavior is problematic?

Sure; curious to hear what others think as well.

> (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
discretion when this is the case)" -> Should we make it a stricter
contract such that the user can reason about it better (there is WIP to
make retention time a strict bound for windowed stores atm)
   -> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
strict bound, too.

Ah, great question. I think the question boils down to: do we want to
require that all versioned stores (including custom user implementations)
use "history retention" to determine when to expire old record versions?

Because the `persistentVersionedKeyValueStore(...)` method returns
instances of the provided RocksDB-based versioned store implementation,
which does use history retention for this purpose, that's why we can very
clearly say that for this store, `get(key, ts)` will return null if the
provided timestamp bound has fallen out of history retention. The reason I
left the `VersionedKeyValueStore#get(key, ts)` Javadoc more generic (i.e.,
does not mention history retention) is because maybe a user implementing
their own custom store will choose a different expiry mechanism, e.g., keep
the three latest versions for each key regardless of how old the timestamps
are.

If we want to require that all versioned stores use history retention in
order to determine when to expire old records, then I will certainly update
the Javadoc to clarify. This is already a requirement for DSL users because
the VersionedBytesStoreSupplier interface requires history retention to be
provided (in order for changelog topic configs to be properly set), so it's
just a question of whether we also want to require PAPI users to use
history retention too. I had a look at the existing window stores and
didn't see precedent for requiring all window stores have a standard
"retention time" concept for how long to keep windows, but if we want to
have a standard "history retention" concept for versioned stores we
certainly can. WDYT?

> (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
also use segments but hard-code it to two (it was exposed in earlier
versions but it seems not useful, even if we would be open to expose it
again if there is user demand).

If we want to leave it out of this first KIP (and potentially expose it in
the future), that works for me. The performance benchmarks I ran suggest
that this parameter greatly impacts store performance though and is very
workload dependent. If a user reported poor performance using versioned
stores for their workload, this is the first parameter I would want to
tune. That said, metrics/observability for versioned stores (which would be
helpful for determining how this parameter should be adjusted) have been
deferred to a follow-up KIP, so perhaps that's reason to defer exposing
this parameter as well.

> (5b) JavaDocs says: "Performance degrades as more record versions for
the same key are collected in a single segment. On the other hand,
out-of-order writes and reads which access older segments may slow down
if there are too many segments." -- Wondering if JavaDocs should make
any statements about expected performance? Seems to be an implementation
detail?

I included this sentence to explain why a user might want to tune this
value / help guide how to think about the parameter, but if we want to
remove it entirely (per the discussion point above) then this Javadoc will
be removed with it.

> (6) validTo timestamp is "exclusive", right? Ie, if I query
`get(key,ts[=validToV1])` I would get `null` or the "next" record v2
with validFromV2=ts?

I actually intended for it to be inclusive (will update the KIP). Do you
think exclusive is more intuitive? The reason I had inclusive in my mind is
because it's like a "AS OF <time>" query, which treats the time bound as
inclusive.

> (7) The KIP says, that segments are stores in the same RocksDB -- for
this case, how are efficient deletes handled? For windowed-store, we can
just delete a full RocksDB.

The way that multiple segments are represented in the same RocksDB is that
the RocksDB keys are prefixed with segment ID. An entire segment is deleted
with a single `deleteRange()` call to RocksDB.

> (8) Rejected alternatives: you propose to not return the validTo
timestamp -- if we find it useful in the future to return it, would
there be a clean path to change it accordingly?

With the current proposal, there's no clean path. If we think there's a
good chance we might want to do this in the future, then we should update
the proposed interfaces.

The current proposed return type from `VersionedKeyValueStore<K,
V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to add a
second timestamp into `ValueAndTimestamp<V>`, which is why there's no clean
path to include validTo timestamp in the future under the existing proposal.

If we wanted to allow for including validTo timestamp in the future, we'd
instead update the return type to be a new `VersionedRecord<V>` object.
Today a `VersionedRecord<V>` could just include `value` and `timestamp`,
and in the future we could add `validTo` (names subject to change) into the
`VersionedRecord` as well. (It'd look a little strange for now since
VersionedRecord is the same as ValueAndTimestamp, but that seems fine.)

If we choose to do this, I think we should also update the return type of
`VersionedKeyValueStore#get(key)` to be VersionedRecord as well, rather
than having one return TimestampAndValue while the other returns
VersionedRecord.

----------- Sagar's comments -----------

> 1) Did you consider adding a method similar to :
List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

Yes, I do think this method is valuable. I think we will definitely want to
support time-range based queries at some point (hopefully soon), and likely
also key-range based queries (to achieve feature parity with existing
key-value stores).

It's not immediately clear to me whether these types of queries should be
supported as part of the store interface or if they should only be
supported via the `query(...)` method for IQv2. (It's an open question as
to whether we should support IQv1 for versioned stores or only IQv2. A
benefit of IQv2 over IQv1 is that we won't need to add individual store
methods for each type of query, including for all wrapped store layers.)

If we have clear non-IQ use cases for these methods (e.g., use cases within
processors), then they'll need to be added as part of the store interface
for sure. I'm leaning towards adding them as part of the store interface
but given the ambiguity here, it may be preferrable to defer to a follow-up
KIP. OTOH, if you think the versioned store interface as proposed in this
KIP is too bare bones to be useful, I'm open to adding it in now as well.

> 2) I have a similar question as Matthias, about the timestampTo argument
when doing a get. Is it inclusive or exclusive?

Same answer (and follow-up question) as above. Do you think it will be
confusing for `get(key, tsTo)` to use an inclusive time bound, while
`get(key, tsFrom, tsTo)` would use an exclusive tsTo time bound? Maybe we
should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or
`getRange(...)` in order to avoid confusion.

> 3) validFrom sounds slightly confusing to me. It is essentially the
timestamp at which the record was inserted. validFrom makes it sound like
validTo which can keep changing based on new records while *from* is fixed.
WDYT?

"It is essentially the timestamp at which the record was inserted" <-- Yes,
that's correct.

I borrowed the "validFrom/validTo" terminology from temporal tables, e.g.,
https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16.
I don't believe the terms "validFrom" or "validTo" are currently exposed
anywhere in any of the user-facing interfaces (or Javadocs); I just needed
a way to refer to the concepts in the KIP. Hopefully this is a non-issue
(at least for now) as a result. Do you have a suggestion for terminology
that would've been less confusing?

> 4) Even I think delete api should be supported.

Makes sense. It'd be to get your input on the same follow-up questions I
asked Matthias above as well :)

On Tue, Nov 22, 2022 at 4:25 AM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Victoria,
>
> Thanks for the KIP. Seems like a very interesting idea!
>
> I have a couple of questions:
>
> 1) Did you consider adding a method similar to :
> List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
>
> I think this could be useful considering that this
> versioning scheme unlocks time travel at a key basis. WDYT?
>
> 2) I have a similar question as Matthias, about the timestampTo argument
> when doing a get. Is it inclusive or exclusive?
>
> 3) validFrom sounds slightly confusing to me. It is essentially the
> timestamp at which the record was inserted. validFrom makes it sound like
> validTo which can keep changing based on new records while *from* is fixed.
> WDYT?
>
> 4) Even I think delete api should be supported.
>
> Thanks!
> Sagar.
>
> On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks for the KIP Victoria. Very well written!
> >
> >
> > Couple of questions (many might just require to add some more details to
> > the KIP):
> >
> >   (1) Why does the new store not extend KeyValueStore, but StateStore?
> > In the end, it's a KeyValueStore?
> >
> >   (2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we don't
> > want to support IQ in this KIP, it might be good to add this interface
> > right away to avoid complications for follow up KIPs? Or won't there by
> > any complications anyway?
> >
> >   (3) Why do we not have a `delete(key)` method? I am ok with not
> > supporting all methods from existing KV-store, but a `delete(key)` seems
> > to be fundamentally to have?
> >
> >   (4a) Do we need `get(key)`? It seems to be the same as `get(key,
> > MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just for
> > my own clarification (should we add something to the JavaDocs?).
> >
> >   (4b) Should we throw an exception if a user queries out-of-bound
> > instead of returning `null` (in `get(key,ts)`)?
> >    -> You put it into "rejected alternatives", and I understand your
> > argument. Would love to get input from others about this question
> > though. -- It seems we also return `null` for windowed stores, so maybe
> > the strongest argument is to align to existing behavior? Or do we have
> > case for which the current behavior is problematic?
> >
> >   (4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
> > discretion when this is the case)" -> Should we make it a stricter
> > contract such that the user can reason about it better (there is WIP to
> > make retention time a strict bound for windowed stores atm)
> >    -> JavaDocs on `persistentVersionedKeyValueStore` seems to suggest a
> > strict bound, too.
> >
> >   (5a) Do we need to expose `segmentInterval`? For windowed-stores, we
> > also use segments but hard-code it to two (it was exposed in earlier
> > versions but it seems not useful, even if we would be open to expose it
> > again if there is user demand).
> >
> >   (5b) JavaDocs says: "Performance degrades as more record versions for
> > the same key are collected in a single segment. On the other hand,
> > out-of-order writes and reads which access older segments may slow down
> > if there are too many segments." -- Wondering if JavaDocs should make
> > any statements about expected performance? Seems to be an implementation
> > detail?
> >
> >   (6) validTo timestamp is "exclusive", right? Ie, if I query
> > `get(key,ts[=validToV1])` I would get `null` or the "next" record v2
> > with validFromV2=ts?
> >
> >   (7) The KIP says, that segments are stores in the same RocksDB -- for
> > this case, how are efficient deletes handled? For windowed-store, we can
> > just delete a full RocksDB.
> >
> >   (8) Rejected alternatives: you propose to not return the validTo
> > timestamp -- if we find it useful in the future to return it, would
> > there be a clean path to change it accordingly?
> >
> >
> > -Matthias
> >
> >
> > On 11/16/22 9:57 PM, Victoria Xia wrote:
> > > Hi everyone,
> > >
> > > I have a proposal for introducing versioned state stores in Kafka
> > Streams.
> > > Versioned state stores are similar to key-value stores except they can
> > > store multiple record versions for a single key. This KIP focuses on
> > > interfaces only in order to limit the scope of the KIP.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores
> > >
> > > Thanks,
> > > Victoria
> > >
> >
>

Reply via email to