Thanks for updating the KIP and splitting into multiple ones. I am just going to reply for the single-key-single-timestamp case below.

It seems the `KeyQuery.java` code snipped is "incomplete" -- the class definition is missing.

At the same time, the example uses `VersionedKeyQuery` so I am not sure right now if you propose to re-use the existing `KeyQuery` class or introduce a new `VersionedKeyQuery` class?

While it was suggested that we re-use the existing `KeyQuery` class, I am wondering what would happen if one uses the new `asOf` method, and passes the query into a non-versioned store?

In the end, a non-versioned store does not know that there is an as-of timestamp set and thus might just do a plain lookup (it also only has a single value per key) and return whatever value it has stored?

I am wondering if this would be semantically questionable and/or confusing for users (especially for timestamped stores)? -- Because the non-versioned store does not know anything about the timestamp, it can also not even check if it's set and raise an error.


Did you try to prototype any of both approaches? Asking because I am wondering about generics and return types? Existing `KeyQuery` is defined as

`KeyQuery<K, V> extends Query<V>` so `V` is the result type.

However for the versioned-store we want the result type to be `VersionedRecord<V>` and thus we would need to set `V = VersionedRecord<V>` -- would this work or would the compiler tip over it (or would it work but still be confusing/complex for users to specify the right types)?

For `VersionedKeyQuery` we could do:

`VersionedKeyQuery<K, V> extends Query<VersionedRecord<V>>`

what seems cleaner?

Without writing code I always have a hard time to reason about generics, so maybe trying out both approaches might shed some light?




-Matthias


On 8/15/23 9:03 AM, Alieh Saeedi wrote:
Hi all,
thanks to all for the great points you mentioned.

Addressed reviews are listed as follows:
1. The methods are defined as composable, as Lucas suggested. Now we have
even more types of single-key_multi-timestamp queries. As Matthias
suggested in his first review, now with composable methods, queries with a
lower time bound are also possible. The meaningless combinations are
prevented by throwing exceptions.
2. I corrected and replaced asOf everywhere instead of until. I hope the
javadocs and the explanations in the KIPs are clear enough about the time
range. Matthias, Lucas, and Victoria asked about the exact time boundaries.
I assumed that if the time range is specified as [t1, t2], all the records
that have been inserted within this time range must be returned by the
query. But I think the point that all of you referred to and that Victoria
clarified very well is valid. Maybe the query must return "all the
records that are valid within the time range". Therefore, records that have
been inserted before t1 are also retuned. Now, this makes more sense to me
as a user. By the way, it seems more like a product question.
3. About the order of retuned records, I added some boolean fields to the
classes to specify them. I still do not have any clue how hard the
implementation of this will be. The question is, is the order considered
for normal range queries as well?
4. As Victoria pointed out the issue about listing tombstones, I changed
the VersionedRecord such that it can have NULL values as well. The question
is, what was the initial intention of setting the value in VersionedRecord
as NOT NULL? I am worried about breaking other parts of the code.
5. About the motivation for defining the VersionedKeyQuery and
VersionedRangeQuery classes: I think my initial intention was to
distinguish between queries that return a single record and queries that
return a set of records. On the other hand, I put both
single-key_single-timestamp queries and single-key_multi-timestamp queries
in the same class, VersionedKeyQuery. Matthias complained about it as well.
Therefore, in my new changes, I put single-key_single-timestamp queries in
the KeyQuery class and single-key_multi-timestamp queries in the
VersionedKeyQuery class. I still have kept the VersionedRangeQuery class.
If there are good arguments for kicking both VersionedKeyQuery and
VersionedRangeQuery classes out, I can change the KIPs.
6. About defining new methods in the VersionedKeyValueStore interface: I
actually have defined the required methods in the RocksDBVersionedStore
class. Since defining them for the interface requires implementing them for
all the classes that have implemented the interface.
7. I replaced Long with Instant type for timestamp.

As Matthias (explicitly) and Victoria (implicitly) suggested, I broke
KIP-960 into three different KIPs.
KIP-960: Support single-key_single-timestamp Interactive Queries (IQv2) for
Versioned State Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores>
KIP-968: Support single-key_multi-timestamp Interactive Queries (IQv2) for
Versioned State Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores>
KIP-969: Support range Interactive Queries (IQv2) for Versioned State
Stores
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores>

Cheers,
Alieh

On Thu, Aug 10, 2023 at 2:38 AM Matthias J. Sax <mj...@apache.org> wrote:

Seems there was a lot of additional feedback. Looking forward to an
updated version of the KIP.

I also agree to make the queries more composable. I was considering to
raise this originally, but hold off because `RangeQuery` is also not
designed very composable. But for versioned store, we have many more
combinations, so making it composable does make sense to me.

About iterator order: I would also propose to be pragmatic, and only add
what is simple to implement for now. We can always extend it later. We
just need to clearly document the order (or say: order is not defined --
also a valid option). Of course, if we limit what we add now, we should
keep in mind how to extend the API in the future without the need to
deprecate a lot of stuff (ideally, we would not need to deprecate
anything but only extend what we have).

Btw: I am also happy to de-scope this KIP to only implement the two
queries Victoria mentioned being easy to implement, and do follow up
KIPs for range queries. There is no need to do everything with a single
KIP.

About the original v-store KIP and `long` vs `Instance` -- I don't think
we forget it. If the store is use inside a `Processor` using `long` is
preferred because performance is important and we are on the hot code
path. For IQ on the other hand, it's not the hot code path, and
semantics exposed to the user are more important. -- At least, this is
how we did it in the past.


One more thoughts.

The new `VersionedKeyQuery` seems to have two different query types
merged into a single class. Queries which return a single result, and
queries that return multiple results. This does not seem ideal. For
`withKeyLatestValue` and `withKeyWithTimestampBound` (should we rename
this to `withKeyAsOfTimestamp`?) I would expect to get a single
`VersionedRecord<V>` back, not an interator. Hence, we might need to
split `VersionedKeyQuery` into two query types?


-Matthias




On 8/9/23 6:46 AM, Victoria Xia wrote:
Hey Alieh,

Thanks for the KIP!

It looks like the KIP proposes three different types of interactive
queries for versioned stores, though they are grouped together into two
classes: VersionedKeyQuery adds supports for single-key, single-timestamp
lookups, and also for single-key, multi-timestamp lookups, while
VersionedRangeQuery additionally adds support for key-range queries.

The first type of query (single-key, single-timestamp lookups) are
already supported by versioned stores (per the VersionedKeyValueStore
interface) today, so exposing these via interactive queries require low
additional implementation effort, and are a quick win to users. The other
two types of queries will require more effort to add, and also come with
more design decisions. I've sorted my thoughts accordingly.

Regarding single-key, multi-timestamp lookups:

1. If we add these, we should add a new method to the
VersionedKeyValueStore interface to support this type of lookup. Otherwise,
there is no easy/efficient way to compose methods from the existing
interface in order to implement this type of lookup, and therefore the new
interactive query type cannot be used on generic VersionedKeyValueStores.

2. I agree with Matthias's and Lucas's comments about being very
explicit about what the timestamp range means. For consistency with
single-key, single-timestamp lookups, I think the "upper timestamp bound"
should really be an "as of timestamp bound" instead, so that it is
inclusive. For the "lower timestamp bound"/start timestamp, we have a
choice regarding whether to interpret it as the user saying "I want valid
records for all timestamps in the range" in which case the query should
return a record with timestamp earlier than the start timestamp, or to
interpret it as the user saying "I want all records with timestamps in the
range" in which case the query should not return any records with timestamp
earlier than the start timestamp. My current preference is for the former,
but it'd be good to hear other opinions.

3. The existing VersionedRecord interface contains only a value and
validFrom timestamp, and does not allow null values. This presents a
problem for introducing single-key, multi-timestamp lookups because if
there is a tombstone contained within the timestamp range of the query,
then there is no way to represent this as part of a
ValueIterator<VersionedRecord> return type. You'll either have to allow
null values or add a validTo timestamp to the returned records.

4. Also +1 to Matthias's question about standardizing the order in which
records are returned. Will they always be returned in forwards-timestamp
order? Reverse-timestamp order? Will users get a choice? It'd be good to
make this explicit in the KIP.

Regarding key-range queries (either single-timestamp or multi-timestamp):

5. Same comment about adding new methods for this type of lookup to the
VersionedKeyValueStore interface.

6. Again +1 to Matthias's question about the order in which records are
returned, for multi-key, multi-timestamp queries. Organizing first by key
and then by timestamp makes the most sense to me, based on the layout of
the existing store implementation. (Trying to sort by timestamp would
require reading potentially all keys into memory first, which is not
feasible.)

I think the complexity of introducing single-key, multi-timestamp
lookups and especially multi-key, multi-timestamp lookups is significantly
higher than for single-key, single-timestamp lookups, so it'd be good to
think about/guage what the use cases for these types of queries are before
committing to the implementation, and also to stage the implementation to
get single-key, single-timestamp lookups as a quick win first without
blocking on the others. (Guessing you were already planning to do that,
though :))

Also a separate question:

7. What's the motivation for introducing new VersionedKeyQuery and
VersionedRangeQuery types rather than re-using the existing KeyQuery and
RangeQuery types, to add optional asOfTimestamp bounds? I can see pros and
cons of each, just curious to hear your thoughts.

If you do choose to keep VersionedKeyQuery and VersionedRangeQuery
separate from KeyQuery and RangeQuery, then you can remove the KeyQuery and
RangeQuery placeholders in the versioned store implementation as part of
implementing your KIP:
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java#L142-L158

Best,
Victoria

On 2023/08/09 10:16:44 Bruno Cadonna wrote:
Hi,

I will use the initials of the authors to distinguish the points.

LB 2.
I like the idea of composable construction of queries. It would make the
API more readable. I think this is better than the VersionsQualifier, I
proposed in BC 3..

LB 4. and LB 5.
Being explicit on the time bounds and key ranges is really important.

LB 6.
I think peekNextValue() comes from peekNextKey() in KeyValueIterator. I
agree with Lucas that in ValueIterator peek() is clear enough.

BC 5.
I missed to answer your question. I am sorry! I do not think we need
system tests. However, you should add a test plan listing the test you
plan to write. I guess this list will comprise unit and integration
tests.

BC 6. (new)
Could you please use asOf or asOfTime or asOfTimestamp instead of
untilTimestamp. I do not want to query a key until a timestamp, but I
want to query a key at a specific point in time. Maybe atTime might also
work, but I think asOf is more popular.

BC 7. (new)
You should change long to Instant for timestamps. Just because we missed
to use Instant in other places, we should not keep long.


Best,
Bruno


On 8/8/23 10:26 PM, Lucas Brutschy wrote:
Hi Alieh,

thanks a lot for the KIP. IQ with time semantics is going to be
another great improvement towards having crystal clear streaming
semantics!

1. I agree with Bruno and Matthias, to remove the 'bound' term for the
timestamps. It's confusing that we have bounds for both timestamps and
keys. In particular, `withNoBoundWithTimestampBound` seems to
contradict itself.

2. I would personally prefer having composable construction of the
query, instead of defining a separate method for each combination. So
for example:
- `keyRangeLatestValue(l,u)` ->  `withBounds(l, u).latest()`
- `withNoBoundsWithTimestampRange(t1,t2)` ->
`withNoBounds().fromTime(t1).untilTime(t2)`
- etc.pp.
This would have the advantage, that the interface would be very
similar to `RangeQuery` and we'd need a lot fewer methods, so it will
make the API reference a much quicker read. We already use this style
to define `skipCache` in `KeyQuery`. I guess that diverges quite a bit
from the current proposal, but I'll leave it here anyways for you to
consider it (even if you decide to stick with the current model).

4. Please make sure to specify in every range-based method whether the
bounds are inclusive or exclusive. I see it being mentioned for some
methods, but for others, this is omitted. As I understand, 'until' is
usually used to mean exclusive, and 'from' is usually used to mean
inclusive, but it's better to specify this in the javadoc.

5. Similarly, as Matthias says, specify what happens if the "validity
range" of a value overlaps with the query range. So, to clarify his
remark, what happens if the value v1 is inserted at time 1 and value
v2 is inserted at time 3, and I query for the range `[2,4]` - will the
result include v1 or not? It's the valid value at time 2. For
inspiration, in `WindowRangeQuery`, this important semantic detail is
even clear from the method name `withWindowStartRange`.

6. For iterators, it is convention to call the method `peek` and this
convention followed by e.g. `AbstractIterator` in Kafka, but also
Guava, Apache Commons etc. So I would also call it `peek`, not
`peekNextValue` here. It's clear what we are peeking at.

Cheers,
Lucas

On Thu, Jul 27, 2023 at 3:07 PM Alieh Saeedi
<as...@confluent.io.invalid> wrote:

Thanks, Bruno, for the feedback.


      - I agree with both points 2 and 3. About 3: Having
"VersionsQualifier"
      reduces the number of methods and makes everything less
confusing. At the
      end, that will be easier to use for the developers.
      - About point 4: I renamed all the properties and parameters from
      "asOfTimestamp" to "fromTimestamp". That was my
misunderstanding. So Now we
      have these two timestamp bounds: "fromTimestamp" and
"untilTimestamp".
      - About point 5: Do we need system tests here? I assumed just
      integration tests were enough.
      - Regarding long vs timestamp instance: I think yes, that 's why
I used
      Long as timestamp.

Bests,
Alieh






On Thu, Jul 27, 2023 at 2:28 PM Bruno Cadonna <ca...@apache.org>
wrote:

Hi Alieh,

Thanks for the KIP!


Here my feedback.

1.
You can remove the private fields and constructors from the KIP.
Those
are implementation details.


2.
Some proposals for renamings

in VersionedKeyQuery

withKeyWithTimestampBound()
      -> withKeyAndAsOf()

withKeyWithTimestampRange()
      -> withKeyAndTimeRange()

in VersionedRangeQuery

KeyRangeWithTimestampBound()
      -> withKeyRangeAndAsOf()

withLowerBoundWithTimestampBound()
      -> withLowerBoundAndAsOf()

withUpperBoundWithTimestampBound()
      -> withUpperBoundAndAsOf()

withNoBoundWithTimestampBound()
      -> withNoBoundsAndAsOf

keyRangeWithTimestampRange()
      -> withKeyRangeAndTimeRange()

withLowerBoundWithTimestampRange()
      -> withLowerBoundAndTimeRange()

withUpperBoundWithTimestampRange()
      -> withUpperBounfAndTimeRange()

withNoBoundWithTimestampRange()
      -> withNoBoundsAndTimeRange()


3.
Would it make sense to merge
withKeyLatestValue(final K key)
and
withKeyAllVersions(final K key)
into
withKey(final K key, final VersionsQualifier versionsQualifier)
where VersionsQualifier is an enum with values (ALL, LATEST). We
could
also add EARLIEST if we feel it might be useful.
Same applies to all methods that end in LatestValue or AllVersions


4.
I think getAsOfTimestamp() should not return the lower bound. If I
query
a version as of a timestamp then the query should return the latest
version less than the timestamp.
I propose to rename the getters to getTimeFrom() and getTimeTo() as
in
WindowRangeQuery.


5.
Please add the Test Plan section.


Regarding long vs Instant: Did we miss to use Instant instead of long
for all interfaces of the versioned state stores?


Best,
Bruno








On 7/26/23 11:40 PM, Matthias J. Sax wrote:
Thanks for the KIP Alieh. Glad to see that we can add IQ to the new
versioned stores!



Couple of questions:

single-key lookup with timestamp (upper) bound

Not sure if "bound" is the right term? In the end, it's a point
lookup
for a key plus timestamps, so it's an as-of timestamp (not a
bound)? Of
course, the returned record would most likely have a different
(smaller)
timestamp, but that's expected but does not make the passed in
timestamp
a "bound" IMHO?

single-key query with timestamp range
single-key all versions query

Should we also add `withLowerTimeBound` and `withUpperTimeBound`
(similar to what `RangeQuery` has)?

Btw: I think we should not pass `long` for timestamps, but
`Instance`
types.

For time-range queries, do we iterate over the values in timestamp
ascending order? If yes, the interface should specify it? Also,
would it
make sense to add reverse order (also ok to exclude and only do if
there
is demand in a follow up KIP; if not, please add to "Rejected
alternatives" section).

Also, for time-range query, what are the exact bound for stuff we
include? In the end, a value was a "valid range" (conceptually), so
do
we include a record if it's valid range overlaps the search
time-range,
or must it be fully included? Or would we only say, that the
`validFrom`
timestamp that is stored must be in the search range (what implies
that
the lower end would be a non-overlapping but "fully included" bound,
while the upper end would be a overlapping bound).

For key-range / time-range queries: do we return the result in
`<k,ts>`
order or `<ts,k>` order? Also, what about reverse iterators?

About ` ValueIterator` -- think the JavaDocs have c&p error in it
for
`peekNextRecord` (also, should it be called `peekNextValue`? (Also
some
other JavaDocs seem to be incomplete and not describe all
parameters?)


Thanks.



-Matthias



On 7/26/23 7:24 AM, Alieh Saeedi wrote:
Hi all,

I would like to propose a KIP to support IQv2 for versioned state
stores.



https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores

Looking forward to your feedback!

Cheers,
Alieh




Reply via email to