Thanks for clarifying about the null-question. SGTM.

On 12/13/22 3:06 PM, Victoria Xia wrote:
Hi Matthias,

Thanks for chiming in! Barring objections from anyone on this thread, I
will start the vote for this KIP on Thursday. That should be enough time to
incorporate any lingering minor changes.

I slightly prefer to add `VersionedRecord` interface (also
like the name). I agree that it's low overhead and providing a clean
path forward for future changes seems worth it to me.

OK, that makes two of us. I updated the KIP just now to formally include
VersionedRecord as the new return type from the various
VersionedKeyValueStore methods.

if we introduce `VersionedRecord`, I think we can keep the not-null
requirement for `ValueAndTimestamp`

Not quite. VersionedRecord is only used as a return type from read methods,
which is why VersionedRecord is able to enforce that its value is never
null. If the value being returned would have been null, then we return a
null VersionedRecord instead, rather than non-null VersionedRecord with
null value. So, there's no use case for a VersionedRecord with null value.

In contrast, even though ValueAndTimestamp is not anywhere in the public
VersionedKeyValueStore interface, ValueAndTimestamp still needs to be used
internally when representing a versioned key-value store as a
TimestampedKeyValueStore, since TimestampedKeyValueStore is used everywhere
throughout the internals of the codebase. In order to represent a versioned
key-value store as a TimestampedKeyValueStore, we have to support `put(K
key, ValueAndTimestamp<V> value)`, which means ValueAndTimestamp needs to
support null value (with timestamp). Otherwise we cannot put a tombstone
into a versioned key-value store when using the internal
TimestampedKeyValueStore representation.

It's very much an implementation detail that ValueAndTimestamp needs to be
relaxed to allow null values. I think this is a minor enough change that is
still preferable to the alternatives (refactoring the processors to not
require TimestampedKeyValueStore, or introducing a separate workaround
`put()` method on the TimestampedKeyValueStore representation of versioned
key-value stores), so I have left it in as part of the KIP.

Best,
Victoria

On Mon, Dec 12, 2022 at 8:42 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks Victoria.

I did not re-read the KIP in full on the wiki but only your email.

Points (1)-(8) SGTM.

About (9): I slightly prefer to add `VersionedRecord` interface (also
like the name). I agree that it's low overhead and providing a clean
path forward for future changes seems worth it to me. Btw: if we
introduce `VersionedRecord`, I think we can keep the not-null
requirement for `ValueAndTimestamp` what seems a small side benefit.
(Btw: your code snippet in the KIP shows what `VersionedRecord` would
have a non-null requirement for the value, but I think it would need to
allow null as value?)


-Matthias

On 12/7/22 5:23 PM, Victoria Xia wrote:
Thanks for the discussion, Bruno, Sagar, and Matthias!

It seems we've reached consensus on almost all of the discussion points.
I've updated the KIP with the following:
1) renamed "timestampTo" in `get(key, timestampTo)` to "asOfTimestamp" to
clarify that this timestamp bound is inclusive, per the SQL guideline
that
"AS OF <timestamp>" queries are inclusive. In the future, if we want to
introduce a timestamp range query, we can use `get(key, timestampFrom,
timestampTo)` and specify that timestampTo is exclusive in this method,
while avoiding confusing with the inclusive asOfTimestamp parameter in
the
other method, given that the names are different.
2) added a description of "history retention" semantics into the
VersionedKeyValueStore interface Javadoc, and updated the Javadoc for
`get(key, asOfTimestamp)` to mention explicitly that a null result is
returned if the provided timestamp bound is not within history retention.
3) added a `delete(key, timestamp)` method (with return type
`ValueAndTimestamp<V>`) to the VersionedKeyValueStore interface.
4) updated the Javadoc for `segmentInterval` to clarify that the only
reason a user might be interested in this parameter is performance.

Other points we discussed which did not result in updates include:
5) whether to automatically update the `min.compaction.lag.ms` config on
changelog topics when history retention is changed -- there's support for
this but let's not bundle it with this KIP. We can have a separate KIP to
change this behavior for the existing windowed changelog topics, in
addition to versioned changelog topics.
6) should we expose segmentInterval in this KIP -- let's go ahead and
expose it now since we'll almost certainly expose it (in this same
manner)
in a follow-on KIP anyway, and so that poor performance for user
workloads
is less likely to be a barrier for users getting started with this
feature.
I updated the Javadoc for this parameter to clarify why the Javadoc
mentions performance despite Javadocs typically not doing so.
7) `get(timestampFrom, timestampTo)` and other methods for IQ -- very
important but deferred to a future KIP
8) `purge(key)`/`deleteAllVersions(key)` -- deferred to a future KIP

That leaves only one unresolved discussion point:
9) whether to include validTo in the return types from `get(...)`. If we
go
with the current proposal of not including validTo in the return type,
then
it will not be easy to add it in the future (unless we want to add
validTo
to ValueAndTimestamp, which feels odd to me). If we think we might want
to
have validTo in the future, we can change the return type of `get(...)`
and
`delete(...)` in this proposal from `ValueAndTimestamp<V>` to a new type,
e.g., `VersionedRecord<V>` or `RecordVersion<V>`, which today will look
the
same as `ValueAndTimestamp<V>` but in the future we can add validTo if we
want. The cost is a new type which today looks the same as
ValueAndTimestamp.

Now that I think about it more, the cost to introducing a new type seems
relatively low. I've added a proposal towards the bottom of the KIP here
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores#KIP889:VersionedStateStores-Additionalreturntimestampsfromget(key,asOfTimestamp)
.
If others also believe that the cost of introducing this new interface is
low (particularly relative to the flexibility it provides us for being
able
to evolve the class in the future), I will incorporate this proposal into
the KIP. I think the hardest part of this will be deciding on a name for
the new class :)

Pending objections, I'd like to make a call on item (9) and call a vote
on
this KIP at the end of this week.

Thanks,
Victoria

On Thu, Dec 1, 2022 at 9:47 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks Victoria!

(1) About `ReadOnlyVersionedKeyValueStore` -- I am not sure about IQv1
vs IQv2. But you might be right that adding the interface later might
not be an issue -- so it does not matter. Just wanted to double check.



(2) About `delete(key, ts)` -- as already discussed, I agree that it
should have same semantics as `put(key, null, ts)` (delete() needs a
timestamp). Not sure if `delete()` really needs to return anything? I
would be ok to make it `void` -- but I think it's also semantically
sound if it returns the "old" value at timestamps `ts` that the delete
actually deleted, as you mentioned -- in the end, an "delete" is a
physical append anyway (ie, "soft delete") as we want to track history.



(3)
Ah, great question. I think the question boils down to: do we want to
require that all versioned stores (including custom user
implementations)
use "history retention" to determine when to expire old record
versions?

I personally think, yes. The main reason for this is, that I think we
need to have a clear contract so we can plug-in custom implementations
into the DSL later? -- I guess, having a stricter contract initially,
and relaxing it later if necessary, is the easier was forward, than the
other way around.

For PAPI users, they are not bound to implement the interface anyway and
can just add any store they like by extending the top level `StateStore`
interface.



(4) About `segmentInterval`: I am personally fine both ways. Seems it's
your call to expose it or not. It seems there is a slight preference to
expose it.



(5) About `validTo`: based on my experience, it's usually simpler to
have it exclusive. It's also how it's defined in "system versioned
temporal tables" in the SQL standard, and how `AS OF <ts>` queries work.

For a join, it of course implies that if a table record has [100,200) as
inclusive `validFrom=100` and exclusive `validTo=200` it would only join
with a stream-side record with 100 <= ts <= 199 (or 100 <= ts < 200 :)).

I would strongly advocate to make the upper bound exclusive (it did
serve us well in the past to align to SQL semantics). It must be clearly
documented of course and we can also name variable accordingly if
necessary.



(6) About including `validTo` in return types -- it's not easy to change
the return type, because the signature of a method is only determined by
it's name in input parameter types, ie, we cannot overload an existing
method to just change the return type, but would need to change its name
or parameter list... Not sure if we can or cannot add `validTo` to
`ValueAndTimestamp` though, but it's a tricky question. Would be good to
get some more input from other if we think that it would be important
enough to worry about it now or not.



(7) About `get(k)` vs `get(k, ts)` vs `getAsOf(k, ts)`: I would prefer
to just keep `get()` with two overloads and not add `getAsOf()`; the
fact that we pass in a timestamp implies we have a point in time query.
(It's cleaner API design to leverage method overloads IMHO, and it's
what we did in the past). Of course, we can name the parameter `get(key,
asOfTimestamp)` if we think it's helpful. And in alignment to have
`validTo` exclusive, `validTo` would be `asOfTimestampe+1` (or larger),
in case we return it.



(8) About updating topic config (ie, history retention and compaction
lag): It think it was actually some oversight to not update topic
configs if the code changes. There is actually a Jira ticket about it. I
would prefer to keep the behavior consistent though and not change it
just for the new versioned-store, but change it globally in one shot
independent of this KIP.


-Matthias



On 12/1/22 10:15 AM, Sagar wrote:
Thanks Victoria,

I guess an advantage of exposing a method like delete(key, timestamp)
could
be that from a user's standpoint, it is a single operation and not 2.
The
equivalent of this method i.e put followed by get is not atomic so
exposing
it certainly sounds like a good idea.

Thanks!
Sagar.

On Tue, Nov 29, 2022 at 1:15 AM Victoria Xia
<victoria....@confluent.io.invalid> wrote:

Thanks, Sagar and Bruno, for your insights and comments!

Sagar: Can we name according to the semantics that you want to
support like `getAsOf` or something like that? I am not sure if we do
that
in our codebase though. Maybe the experts can chime in.

Because it is a new method that will be added, we should be able to
name it
whatever we like. I agree `getAsOf` is more clear, albeit wordier.
Introducing `getAsOf(key, timestamp)` means we could leave open
`get(key,
timeFrom, timeTo)` to have an exclusive `timeTo` without introducing a
collision. (We could introduce `getBetween(key, timeFrom, timeTo)`
instead
to delineate even more clearly, though this is better left for a
future
KIP.)

I don't think there's any existing precedent in codebase to follow
here
but
I'll leave that to the experts. Curious to hear what others prefer as
well.

Sagar: With delete, we would stlll keep the older versions of the key
right?

We could certainly choose this for the semantics of delete(...) -- and
it
sounds like we should too, based on Bruno's confirmation below that
this
feels more natural to him as well -- but as Bruno noted in his message
below I think we'll want the method signature to be `delete(key,
timestamp)` then, so that there is an explicit timestamp to associate
with
the deletion. In other words, `delete(key, timestamp)` has the same
effect
as `put(key, null, timestamp)`. The only difference is that the
`put(...)`
method has a `void` return type, while `delete(key, timestamp)` can
have
`ValueAndTimestamp` as return type in order to return the record which
is
replaced (if any). In other words, `delete(key, timestamp)` is
equivalent
to `put(key, null, timestamp)` followed by `get(key, timestamp)`.

Bruno: I would also not change the semantics so that it deletes all
versions of
a key. I would rather add a new method purge(key) or
deleteAllVersions(key) or similar if we want to have such a method in
this first KIP.

Makes sense; I'm convinced. Let's defer
`purge(key)`/`deleteAllVersions(key)` to a future KIP. If there's
agreement
that `delete(key, timestamp)` (as described above) is valuable, we can
keep
it in this first KIP even though it is syntactic sugar. If this turns
into
a larger discussion, we can defer this to a future KIP as well.

Bruno: I would treat the history retention as a strict limit. [...]
You
could also add historyRetentionMs() to the VersionedKeyValueStore<K,
V>
interface to make the concept of the history retention part of the
interface.

OK. That's the second vote for rewording the javadoc for
`VersionedKeyValueStore#get(key, timestampTo)` to remove the
parenthetical
and clarify that history retention should be used to dictate this
case,
so
I'll go ahead and do that. I'll leave out adding
`historyRetentionMs()`
to
the interface for now, though, for the sake of consistency with other
stores (e.g., window stores) which don't expose similar types of
configurations from their interfaces.

Bruno: exclusive vs inclusive regarding validTo timestamp in get().
Doesn't this decision depend on the semantics of the join for which
this
state store should be used?

Yes, you are correct. As a user I would expect that a stream-side
record
with the same timestamp as a table-side record _would_ produce a join
result, which is consistent with the proposal for timestampTo to be
inclusive. (FWIW I tried this out with a Flink temporal join just now
and
observed this result as well. Not sure where to look for other
standards to
validate this expectation.)

Bruno: If Streams does not update min.compaction.lag.ms during
rebalances,
users have to do it each time they change history retention in the
code,
right? That seems odd to me. What is the actual reason for not
updating
the config? How does Streams handle updates to windowed stores?

Yes, users will have to update min.compaction.lag.ms for the
changelog
topic themselves if they update history retention in their code. This
is
consistent with what happens for window stores today: e.g., if a user
updates grace period for a windowed aggregation, then they are
responsible
for updating retention.ms on their windowed changelog topic as well.

I'm not familiar with the historical context around why this is the
case --
Matthias, do you know?

My best guess is that Streams does not want to interfere with any
potential
out-of-band changes by the user between application restarts, though
I'm
not sure why a user would want to change this specific config to a
value
which does not accord with the specified history retention. I notice
that
there is code for validating topic configs and collecting validation
errors
(



https://github.com/apache/kafka/blob/be032735b39360df1a6de1a7feea8b4336e5bcc0/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L318-L319
)
but this method is not called from anywhere, even though there are
unit
tests for it. I was unable to find history of this validation after a
quick
search. Hopefully Matthias (or others) has context, otherwise I will
have a
closer look.

- Victoria

On Wed, Nov 23, 2022 at 8:52 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi all,

Thanks for the KIP, Victoria!

I have a couple of comments.

1. delete(key)
I think delete(key) should not remove all versions of a key. We
should
use it to close the validity interval of the last version.
Assuming we have records of different versions for key A:
(A, e, 0, 2),
(A, f, 2, 3),
(A, g, 3, MAX)

delete(A) would update them to

(A, e, 0, 2),
(A, f, 2, 3),
(A, g, 3, 5)
(A, null, 5, MAX)

But then the question arises where does timestamp 5 that closes the
interval in (A, g, 3, 5) and opens the interval in (A, null, 5, MAX)
come from. We could use the timestamp at which delete(A) is called,
but
actually I do not like that because it seems to me it opens the doors
to
non-determinism. If we use event time for put() we should also use it
for delete(). Actually, put(A, null, 5) would have the same effect as
delete(A) in the example above. As a syntactical sugar, we could add
delete(key, validFrom). (I just realized now that I just repeated
what
Victoria said in her previous e-mail.)
I agree with Victoria that delete(A) as defined for other state
stores
is hard to re-use in the versioned key-value store.
I would also not change the semantics so that it deletes all versions
of
a key. I would rather add a new method purge(key) or
deleteAllVersions(key) or similar if we want to have such a method in
this first KIP.


2. history retention
I would remove "(up to store implementation discretion when this is
the
case)". I would treat the history retention as a strict limit. If
users
want to implement a less strict behavior, they can still do it. Maybe
mention in the javadocs the implications of not adhering strictly to
the
history retention. That is, the DSL might become non-deterministic.
You
could also add historyRetentionMs() to the VersionedKeyValueStore<K,
V>
interface to make the concept of the history retention part of the
interface.

3. null vs. exception for out-of-bound queries
I am in favor of null. The record version is not there anymore
because
it expired. This seems to me normal and nothing exceptional. That
would
also consistent with the behavior of other APIs as already mentioned.


4. Exposing segmentInterval
Since we have evidence that the segment interval affects
performance, I
would expose it. But I find it also OK to expose it once we have a
corresponding metric.

5. exclusive vs inclusive regarding validTo timestamp in get()
Doesn't this decision depend on the semantics of the join for which
this
state store should be used? Should a record on the table side that
has
the same timestamp as the record on the stream side join? Or should
only
records in the table that are strictly before the record on the
stream
side join?


6. Not setting min.compaction.lag.ms during rebalances
If Streams does not update min.compaction.lag.ms during rebalances,
users have to do it each time they change history retention in the
code,
right? That seems odd to me. What is the actual reason for not
updating
the config? How does Streams handle updates to windowed stores? That
should be a similar situation for the retention time config of the
changelog topic.


Best,
Bruno



On 23.11.22 09:11, Sagar wrote:
Hi Vicky,

Thanks for your response!

I would just use numbers to refer to your comments.

1) Thanks for your response. Even I am not totally sure whether
these
should be supported via IQv2 or via store interface. That said, I
wouldn't
definitely qualify this as  blocking the KIP for sure so we can live
without it :)

2) Yeah if the 2 APIs for get have different semantics for
timestampTo,
then it could be confusing. I went through the link for temporal
tables
(TFS!) and I now get why the AS OF semantics would have it
inclusive.
I
think part of the problem is that the name get on it's own is not as
expressive as SQL. Can we name according to the semantics that you
want
to
support like `getAsOf` or something like that? I am not sure if we
do
that
in our codebase though. Maybe the experts can chime in.

3) hmm I would have named it `validUpto` But again not very picky
about
it.
After going through the link and your KIP, it's a lot clearer to me.

4) I think delete(key) should be sufficient. With delete, we would
stlll keep the older versions of the key right?

Thanks!
Sagar.

On Wed, Nov 23, 2022 at 12:17 AM Victoria Xia
<victoria....@confluent.io.invalid> wrote:

Thanks, Matthias and Sagar, for your comments! I've responded here
for
now,
and will update the KIP afterwards with the outcome of our
discussions
as
they resolve.

----------- Matthias's comments -----------

(1) Why does the new store not extend KeyValueStore, but
StateStore?
In the end, it's a KeyValueStore?

A `VersionedKeyValueStore<K, V>` is not a `KeyValueStore<K, V>`
because
many of the KeyValueStore methods would not make sense for a
versioned
store. For example, `put(K key, V value)` is not meaningful for a
versioned
store because the record needs a timestamp associated with it.

A `VersionedKeyValueStore<K, V>` is more similar to a
`KeyValueStore<K,
ValueAndTimestamp<V>>` (i.e., `TimestampedKeyValueStore<K, V>`),
but
some
of the TimestampedKeyValueStore methods are still problematic. For
example,
what does it mean for `delete(K key)` to have return type
`ValueAndTimestamp<V>`? Does this mean that `delete(K key)` only
deletes
(and returns) the latest record version for the key? Probably we
want
a
versioned store to have `delete(K key)` delete all record versions
for
the
given key, in which case the return type is better suited as an
iterator/collection of KeyValueTimestamp. `putIfAbsent(K key,
ValueAndTimestamp value)` also has ambiguous semantics for
versioned
stores
(i.e., what does it mean for the key/record to be "absent").

I agree that conceptually a versioned key-value store is just a
key-value
store, though. In the future if we redesign the store interfaces,
it'd
be
great to unify them by having a more generic KeyValueStore
interface
that
allows for extra flexibility to support different types of
key-value
stores, including versioned stores. (Or, if you can think of a way
to
achieve this with the existing interfaces today, I'm all ears!)

(2) Should we have a ReadOnlyVersionedKeyValueStore? Even if we
don't
want to support IQ in this KIP, it might be good to add this
interface
right away to avoid complications for follow up KIPs? Or won't
there
by
any complications anyway?

I don't think there will be complications for refactoring to add
this
interface in the future. Refactoring out
ReadOnlyVersionedKeyValueStore
from VersionedKeyValueStore would leave VersionedKeyValueStore
unchanged
from the outside.

Also, is it true that the ReadOnlyKeyValueStore interface is only
used
for
IQv1 and not IQv2? I think it's an open question as to whether we
should
support IQv1 for versioned stores or only IQv2. If the latter, then
maybe
we won't need the extra interface at all.

(3) Why do we not have a `delete(key)` method? I am ok with not
supporting all methods from existing KV-store, but a `delete(key)`
seems
to be fundamentally to have?

What do you think the semantics of `delete(key)` should be for
versioned
stores? Should `delete(key)` delete (and return) all record
versions
for
the key? Or should we have `delete(key, timestamp)` which is
equivalent
to
`put(key, null, timestamp)` except with a return type to return
ValueAndTimestamp representing the record it replaced?

If we have ready alignment on what the interface and semantics for
`delete(key)` should be, then adding it in this KIP sounds good. I
just
didn't want the rest of the KIP to be hung up over additional
interfaces,
given that we can always add extra interfaces in the future.

(4a) Do we need `get(key)`? It seems to be the same as `get(key,
MAX_VALUE)`? Maybe is good to have as syntactic sugar though? Just
for
my own clarification (should we add something to the JavaDocs?).

Correct, it is just syntactic sugar. I will add a clarification
into
the
Javadocs as you've suggested.

(4b) Should we throw an exception if a user queries out-of-bound
instead of returning `null` (in `get(key,ts)`)?
       -> You put it into "rejected alternatives", and I understand
your
argument. Would love to get input from others about this question
though. -- It seems we also return `null` for windowed stores, so
maybe
the strongest argument is to align to existing behavior? Or do we
have
case for which the current behavior is problematic?

Sure; curious to hear what others think as well.

(4c) JavaDoc on `get(key,ts)` says: "(up to store implementation
discretion when this is the case)" -> Should we make it a stricter
contract such that the user can reason about it better (there is
WIP
to
make retention time a strict bound for windowed stores atm)
       -> JavaDocs on `persistentVersionedKeyValueStore` seems to
suggest a
strict bound, too.

Ah, great question. I think the question boils down to: do we want
to
require that all versioned stores (including custom user
implementations)
use "history retention" to determine when to expire old record
versions?

Because the `persistentVersionedKeyValueStore(...)` method returns
instances of the provided RocksDB-based versioned store
implementation,
which does use history retention for this purpose, that's why we
can
very
clearly say that for this store, `get(key, ts)` will return null if
the
provided timestamp bound has fallen out of history retention. The
reason I
left the `VersionedKeyValueStore#get(key, ts)` Javadoc more generic
(i.e.,
does not mention history retention) is because maybe a user
implementing
their own custom store will choose a different expiry mechanism,
e.g.,
keep
the three latest versions for each key regardless of how old the
timestamps
are.

If we want to require that all versioned stores use history
retention
in
order to determine when to expire old records, then I will
certainly
update
the Javadoc to clarify. This is already a requirement for DSL users
because
the VersionedBytesStoreSupplier interface requires history
retention
to
be
provided (in order for changelog topic configs to be properly set),
so
it's
just a question of whether we also want to require PAPI users to
use
history retention too. I had a look at the existing window stores
and
didn't see precedent for requiring all window stores have a
standard
"retention time" concept for how long to keep windows, but if we
want
to
have a standard "history retention" concept for versioned stores we
certainly can. WDYT?

(5a) Do we need to expose `segmentInterval`? For windowed-stores,
we
also use segments but hard-code it to two (it was exposed in
earlier
versions but it seems not useful, even if we would be open to
expose
it
again if there is user demand).

If we want to leave it out of this first KIP (and potentially
expose
it
in
the future), that works for me. The performance benchmarks I ran
suggest
that this parameter greatly impacts store performance though and is
very
workload dependent. If a user reported poor performance using
versioned
stores for their workload, this is the first parameter I would want
to
tune. That said, metrics/observability for versioned stores (which
would be
helpful for determining how this parameter should be adjusted) have
been
deferred to a follow-up KIP, so perhaps that's reason to defer
exposing
this parameter as well.

(5b) JavaDocs says: "Performance degrades as more record versions
for
the same key are collected in a single segment. On the other hand,
out-of-order writes and reads which access older segments may slow
down
if there are too many segments." -- Wondering if JavaDocs should
make
any statements about expected performance? Seems to be an
implementation
detail?

I included this sentence to explain why a user might want to tune
this
value / help guide how to think about the parameter, but if we want
to
remove it entirely (per the discussion point above) then this
Javadoc
will
be removed with it.

(6) validTo timestamp is "exclusive", right? Ie, if I query
`get(key,ts[=validToV1])` I would get `null` or the "next" record
v2
with validFromV2=ts?

I actually intended for it to be inclusive (will update the KIP).
Do
you
think exclusive is more intuitive? The reason I had inclusive in my
mind is
because it's like a "AS OF <time>" query, which treats the time
bound
as
inclusive.

(7) The KIP says, that segments are stores in the same RocksDB --
for
this case, how are efficient deletes handled? For windowed-store,
we
can
just delete a full RocksDB.

The way that multiple segments are represented in the same RocksDB
is
that
the RocksDB keys are prefixed with segment ID. An entire segment is
deleted
with a single `deleteRange()` call to RocksDB.

(8) Rejected alternatives: you propose to not return the validTo
timestamp -- if we find it useful in the future to return it, would
there be a clean path to change it accordingly?

With the current proposal, there's no clean path. If we think
there's
a
good chance we might want to do this in the future, then we should
update
the proposed interfaces.

The current proposed return type from `VersionedKeyValueStore<K,
V>#get(key, tsTo)` is `ValueAndTimestamp<V>`. There's no way to
add a
second timestamp into `ValueAndTimestamp<V>`, which is why there's
no
clean
path to include validTo timestamp in the future under the existing
proposal.

If we wanted to allow for including validTo timestamp in the
future,
we'd
instead update the return type to be a new `VersionedRecord<V>`
object.
Today a `VersionedRecord<V>` could just include `value` and
`timestamp`,
and in the future we could add `validTo` (names subject to change)
into
the
`VersionedRecord` as well. (It'd look a little strange for now
since
VersionedRecord is the same as ValueAndTimestamp, but that seems
fine.)

If we choose to do this, I think we should also update the return
type
of
`VersionedKeyValueStore#get(key)` to be VersionedRecord as well,
rather
than having one return TimestampAndValue while the other returns
VersionedRecord.

----------- Sagar's comments -----------

1) Did you consider adding a method similar to :
List<ValueAndTimeStamp<V>> get(K key, long from, long to)?
I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

Yes, I do think this method is valuable. I think we will definitely
want to
support time-range based queries at some point (hopefully soon),
and
likely
also key-range based queries (to achieve feature parity with
existing
key-value stores).

It's not immediately clear to me whether these types of queries
should
be
supported as part of the store interface or if they should only be
supported via the `query(...)` method for IQv2. (It's an open
question
as
to whether we should support IQv1 for versioned stores or only
IQv2.
A
benefit of IQv2 over IQv1 is that we won't need to add individual
store
methods for each type of query, including for all wrapped store
layers.)

If we have clear non-IQ use cases for these methods (e.g., use
cases
within
processors), then they'll need to be added as part of the store
interface
for sure. I'm leaning towards adding them as part of the store
interface
but given the ambiguity here, it may be preferrable to defer to a
follow-up
KIP. OTOH, if you think the versioned store interface as proposed
in
this
KIP is too bare bones to be useful, I'm open to adding it in now as
well.

2) I have a similar question as Matthias, about the timestampTo
argument
when doing a get. Is it inclusive or exclusive?

Same answer (and follow-up question) as above. Do you think it will
be
confusing for `get(key, tsTo)` to use an inclusive time bound,
while
`get(key, tsFrom, tsTo)` would use an exclusive tsTo time bound?
Maybe
we
should rename `get(key, tsFrom, tsTo)` to `getVersions(...)` or
`getRange(...)` in order to avoid confusion.

3) validFrom sounds slightly confusing to me. It is essentially
the
timestamp at which the record was inserted. validFrom makes it
sound
like
validTo which can keep changing based on new records while *from*
is
fixed.
WDYT?

"It is essentially the timestamp at which the record was inserted"
<--
Yes,
that's correct.

I borrowed the "validFrom/validTo" terminology from temporal
tables,
e.g.,





https://learn.microsoft.com/en-us/sql/relational-databases/tables/temporal-tables?view=sql-server-ver16
.
I don't believe the terms "validFrom" or "validTo" are currently
exposed
anywhere in any of the user-facing interfaces (or Javadocs); I just
needed
a way to refer to the concepts in the KIP. Hopefully this is a
non-issue
(at least for now) as a result. Do you have a suggestion for
terminology
that would've been less confusing?

4) Even I think delete api should be supported.

Makes sense. It'd be to get your input on the same follow-up
questions I
asked Matthias above as well :)

On Tue, Nov 22, 2022 at 4:25 AM Sagar <sagarmeansoc...@gmail.com>
wrote:

Hi Victoria,

Thanks for the KIP. Seems like a very interesting idea!

I have a couple of questions:

1) Did you consider adding a method similar to :
List<ValueAndTimeStamp<V>> get(K key, long from, long to)?

I think this could be useful considering that this
versioning scheme unlocks time travel at a key basis. WDYT?

2) I have a similar question as Matthias, about the timestampTo
argument
when doing a get. Is it inclusive or exclusive?

3) validFrom sounds slightly confusing to me. It is essentially
the
timestamp at which the record was inserted. validFrom makes it
sound
like
validTo which can keep changing based on new records while *from*
is
fixed.
WDYT?

4) Even I think delete api should be supported.

Thanks!
Sagar.

On Tue, Nov 22, 2022 at 8:02 AM Matthias J. Sax <mj...@apache.org

wrote:

Thanks for the KIP Victoria. Very well written!


Couple of questions (many might just require to add some more
details
to
the KIP):

      (1) Why does the new store not extend KeyValueStore, but
StateStore?
In the end, it's a KeyValueStore?

      (2) Should we have a ReadOnlyVersionedKeyValueStore? Even
if we
don't
want to support IQ in this KIP, it might be good to add this
interface
right away to avoid complications for follow up KIPs? Or won't
there
by
any complications anyway?

      (3) Why do we not have a `delete(key)` method? I am ok with
not
supporting all methods from existing KV-store, but a
`delete(key)`
seems
to be fundamentally to have?

      (4a) Do we need `get(key)`? It seems to be the same as
`get(key,
MAX_VALUE)`? Maybe is good to have as syntactic sugar though?
Just
for
my own clarification (should we add something to the JavaDocs?).

      (4b) Should we throw an exception if a user queries
out-of-bound
instead of returning `null` (in `get(key,ts)`)?
       -> You put it into "rejected alternatives", and I
understand
your
argument. Would love to get input from others about this question
though. -- It seems we also return `null` for windowed stores, so
maybe
the strongest argument is to align to existing behavior? Or do we
have
case for which the current behavior is problematic?

      (4c) JavaDoc on `get(key,ts)` says: "(up to store
implementation
discretion when this is the case)" -> Should we make it a
stricter
contract such that the user can reason about it better (there is
WIP
to
make retention time a strict bound for windowed stores atm)
       -> JavaDocs on `persistentVersionedKeyValueStore` seems to
suggest a
strict bound, too.

      (5a) Do we need to expose `segmentInterval`? For
windowed-stores,
we
also use segments but hard-code it to two (it was exposed in
earlier
versions but it seems not useful, even if we would be open to
expose
it
again if there is user demand).

      (5b) JavaDocs says: "Performance degrades as more record
versions
for
the same key are collected in a single segment. On the other
hand,
out-of-order writes and reads which access older segments may
slow
down
if there are too many segments." -- Wondering if JavaDocs should
make
any statements about expected performance? Seems to be an
implementation
detail?

      (6) validTo timestamp is "exclusive", right? Ie, if I query
`get(key,ts[=validToV1])` I would get `null` or the "next" record
v2
with validFromV2=ts?

      (7) The KIP says, that segments are stores in the same
RocksDB
--
for
this case, how are efficient deletes handled? For windowed-store,
we
can
just delete a full RocksDB.

      (8) Rejected alternatives: you propose to not return the
validTo
timestamp -- if we find it useful in the future to return it,
would
there be a clean path to change it accordingly?


-Matthias


On 11/16/22 9:57 PM, Victoria Xia wrote:
Hi everyone,

I have a proposal for introducing versioned state stores in
Kafka
Streams.
Versioned state stores are similar to key-value stores except
they
can
store multiple record versions for a single key. This KIP
focuses
on
interfaces only in order to limit the scope of the KIP.








https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores

Thanks,
Victoria













Reply via email to