Hey Alieh,

Thanks for the KIP! 

It looks like the KIP proposes three different types of interactive queries for 
versioned stores, though they are grouped together into two classes: 
VersionedKeyQuery adds supports for single-key, single-timestamp lookups, and 
also for single-key, multi-timestamp lookups, while VersionedRangeQuery 
additionally adds support for key-range queries.

The first type of query (single-key, single-timestamp lookups) are already 
supported by versioned stores (per the VersionedKeyValueStore interface) today, 
so exposing these via interactive queries require low additional implementation 
effort, and are a quick win to users. The other two types of queries will 
require more effort to add, and also come with more design decisions. I've 
sorted my thoughts accordingly.

Regarding single-key, multi-timestamp lookups:

1. If we add these, we should add a new method to the VersionedKeyValueStore 
interface to support this type of lookup. Otherwise, there is no easy/efficient 
way to compose methods from the existing interface in order to implement this 
type of lookup, and therefore the new interactive query type cannot be used on 
generic VersionedKeyValueStores.

2. I agree with Matthias's and Lucas's comments about being very explicit about 
what the timestamp range means. For consistency with single-key, 
single-timestamp lookups, I think the "upper timestamp bound" should really be 
an "as of timestamp bound" instead, so that it is inclusive. For the "lower 
timestamp bound"/start timestamp, we have a choice regarding whether to 
interpret it as the user saying "I want valid records for all timestamps in the 
range" in which case the query should return a record with timestamp earlier 
than the start timestamp, or to interpret it as the user saying "I want all 
records with timestamps in the range" in which case the query should not return 
any records with timestamp earlier than the start timestamp. My current 
preference is for the former, but it'd be good to hear other opinions.

3. The existing VersionedRecord interface contains only a value and validFrom 
timestamp, and does not allow null values. This presents a problem for 
introducing single-key, multi-timestamp lookups because if there is a tombstone 
contained within the timestamp range of the query, then there is no way to 
represent this as part of a ValueIterator<VersionedRecord> return type. You'll 
either have to allow null values or add a validTo timestamp to the returned 
records.

4. Also +1 to Matthias's question about standardizing the order in which 
records are returned. Will they always be returned in forwards-timestamp order? 
Reverse-timestamp order? Will users get a choice? It'd be good to make this 
explicit in the KIP.

Regarding key-range queries (either single-timestamp or multi-timestamp):

5. Same comment about adding new methods for this type of lookup to the 
VersionedKeyValueStore interface.

6. Again +1 to Matthias's question about the order in which records are 
returned, for multi-key, multi-timestamp queries. Organizing first by key and 
then by timestamp makes the most sense to me, based on the layout of the 
existing store implementation. (Trying to sort by timestamp would require 
reading potentially all keys into memory first, which is not feasible.)

I think the complexity of introducing single-key, multi-timestamp lookups and 
especially multi-key, multi-timestamp lookups is significantly higher than for 
single-key, single-timestamp lookups, so it'd be good to think about/guage what 
the use cases for these types of queries are before committing to the 
implementation, and also to stage the implementation to get single-key, 
single-timestamp lookups as a quick win first without blocking on the others. 
(Guessing you were already planning to do that, though :))

Also a separate question: 

7. What's the motivation for introducing new VersionedKeyQuery and 
VersionedRangeQuery types rather than re-using the existing KeyQuery and 
RangeQuery types, to add optional asOfTimestamp bounds? I can see pros and cons 
of each, just curious to hear your thoughts.

If you do choose to keep VersionedKeyQuery and VersionedRangeQuery separate 
from KeyQuery and RangeQuery, then you can remove the KeyQuery and RangeQuery 
placeholders in the versioned store implementation as part of implementing your 
KIP: 
https://github.com/apache/kafka/blob/f23394336a7741bf4eb23fcde951af0a23a69bd0/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java#L142-L158

Best,
Victoria

On 2023/08/09 10:16:44 Bruno Cadonna wrote:
> Hi,
> 
> I will use the initials of the authors to distinguish the points.
> 
> LB 2.
> I like the idea of composable construction of queries. It would make the 
> API more readable. I think this is better than the VersionsQualifier, I 
> proposed in BC 3..
> 
> LB 4. and LB 5.
> Being explicit on the time bounds and key ranges is really important.
> 
> LB 6.
> I think peekNextValue() comes from peekNextKey() in KeyValueIterator. I 
> agree with Lucas that in ValueIterator peek() is clear enough.
> 
> BC 5.
> I missed to answer your question. I am sorry! I do not think we need 
> system tests. However, you should add a test plan listing the test you 
> plan to write. I guess this list will comprise unit and integration tests.
> 
> BC 6. (new)
> Could you please use asOf or asOfTime or asOfTimestamp instead of 
> untilTimestamp. I do not want to query a key until a timestamp, but I 
> want to query a key at a specific point in time. Maybe atTime might also 
> work, but I think asOf is more popular.
> 
> BC 7. (new)
> You should change long to Instant for timestamps. Just because we missed 
> to use Instant in other places, we should not keep long.
> 
> 
> Best,
> Bruno
> 
> 
> On 8/8/23 10:26 PM, Lucas Brutschy wrote:
> > Hi Alieh,
> > 
> > thanks a lot for the KIP. IQ with time semantics is going to be
> > another great improvement towards having crystal clear streaming
> > semantics!
> > 
> > 1. I agree with Bruno and Matthias, to remove the 'bound' term for the
> > timestamps. It's confusing that we have bounds for both timestamps and
> > keys. In particular, `withNoBoundWithTimestampBound` seems to
> > contradict itself.
> > 
> > 2. I would personally prefer having composable construction of the
> > query, instead of defining a separate method for each combination. So
> > for example:
> > - `keyRangeLatestValue(l,u)` ->  `withBounds(l, u).latest()`
> > - `withNoBoundsWithTimestampRange(t1,t2)` ->
> > `withNoBounds().fromTime(t1).untilTime(t2)`
> > - etc.pp.
> > This would have the advantage, that the interface would be very
> > similar to `RangeQuery` and we'd need a lot fewer methods, so it will
> > make the API reference a much quicker read. We already use this style
> > to define `skipCache` in `KeyQuery`. I guess that diverges quite a bit
> > from the current proposal, but I'll leave it here anyways for you to
> > consider it (even if you decide to stick with the current model).
> > 
> > 4. Please make sure to specify in every range-based method whether the
> > bounds are inclusive or exclusive. I see it being mentioned for some
> > methods, but for others, this is omitted. As I understand, 'until' is
> > usually used to mean exclusive, and 'from' is usually used to mean
> > inclusive, but it's better to specify this in the javadoc.
> > 
> > 5. Similarly, as Matthias says, specify what happens if the "validity
> > range" of a value overlaps with the query range. So, to clarify his
> > remark, what happens if the value v1 is inserted at time 1 and value
> > v2 is inserted at time 3, and I query for the range `[2,4]` - will the
> > result include v1 or not? It's the valid value at time 2. For
> > inspiration, in `WindowRangeQuery`, this important semantic detail is
> > even clear from the method name `withWindowStartRange`.
> > 
> > 6. For iterators, it is convention to call the method `peek` and this
> > convention followed by e.g. `AbstractIterator` in Kafka, but also
> > Guava, Apache Commons etc. So I would also call it `peek`, not
> > `peekNextValue` here. It's clear what we are peeking at.
> > 
> > Cheers,
> > Lucas
> > 
> > On Thu, Jul 27, 2023 at 3:07 PM Alieh Saeedi
> > <as...@confluent.io.invalid> wrote:
> >>
> >> Thanks, Bruno, for the feedback.
> >>
> >>
> >>     - I agree with both points 2 and 3. About 3: Having "VersionsQualifier"
> >>     reduces the number of methods and makes everything less confusing. At 
> >> the
> >>     end, that will be easier to use for the developers.
> >>     - About point 4: I renamed all the properties and parameters from
> >>     "asOfTimestamp" to "fromTimestamp". That was my misunderstanding. So 
> >> Now we
> >>     have these two timestamp bounds: "fromTimestamp" and "untilTimestamp".
> >>     - About point 5: Do we need system tests here? I assumed just
> >>     integration tests were enough.
> >>     - Regarding long vs timestamp instance: I think yes, that 's why I used
> >>     Long as timestamp.
> >>
> >> Bests,
> >> Alieh
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Jul 27, 2023 at 2:28 PM Bruno Cadonna <ca...@apache.org> wrote:
> >>
> >>> Hi Alieh,
> >>>
> >>> Thanks for the KIP!
> >>>
> >>>
> >>> Here my feedback.
> >>>
> >>> 1.
> >>> You can remove the private fields and constructors from the KIP. Those
> >>> are implementation details.
> >>>
> >>>
> >>> 2.
> >>> Some proposals for renamings
> >>>
> >>> in VersionedKeyQuery
> >>>
> >>> withKeyWithTimestampBound()
> >>>     -> withKeyAndAsOf()
> >>>
> >>> withKeyWithTimestampRange()
> >>>     -> withKeyAndTimeRange()
> >>>
> >>> in VersionedRangeQuery
> >>>
> >>> KeyRangeWithTimestampBound()
> >>>     -> withKeyRangeAndAsOf()
> >>>
> >>> withLowerBoundWithTimestampBound()
> >>>     -> withLowerBoundAndAsOf()
> >>>
> >>> withUpperBoundWithTimestampBound()
> >>>     -> withUpperBoundAndAsOf()
> >>>
> >>> withNoBoundWithTimestampBound()
> >>>     -> withNoBoundsAndAsOf
> >>>
> >>> keyRangeWithTimestampRange()
> >>>     -> withKeyRangeAndTimeRange()
> >>>
> >>> withLowerBoundWithTimestampRange()
> >>>     -> withLowerBoundAndTimeRange()
> >>>
> >>> withUpperBoundWithTimestampRange()
> >>>     -> withUpperBounfAndTimeRange()
> >>>
> >>> withNoBoundWithTimestampRange()
> >>>     -> withNoBoundsAndTimeRange()
> >>>
> >>>
> >>> 3.
> >>> Would it make sense to merge
> >>> withKeyLatestValue(final K key)
> >>> and
> >>> withKeyAllVersions(final K key)
> >>> into
> >>> withKey(final K key, final VersionsQualifier versionsQualifier)
> >>> where VersionsQualifier is an enum with values (ALL, LATEST). We could
> >>> also add EARLIEST if we feel it might be useful.
> >>> Same applies to all methods that end in LatestValue or AllVersions
> >>>
> >>>
> >>> 4.
> >>> I think getAsOfTimestamp() should not return the lower bound. If I query
> >>> a version as of a timestamp then the query should return the latest
> >>> version less than the timestamp.
> >>> I propose to rename the getters to getTimeFrom() and getTimeTo() as in
> >>> WindowRangeQuery.
> >>>
> >>>
> >>> 5.
> >>> Please add the Test Plan section.
> >>>
> >>>
> >>> Regarding long vs Instant: Did we miss to use Instant instead of long
> >>> for all interfaces of the versioned state stores?
> >>>
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 7/26/23 11:40 PM, Matthias J. Sax wrote:
> >>>> Thanks for the KIP Alieh. Glad to see that we can add IQ to the new
> >>>> versioned stores!
> >>>>
> >>>>
> >>>>
> >>>> Couple of questions:
> >>>>
> >>>>> single-key lookup with timestamp (upper) bound
> >>>>
> >>>> Not sure if "bound" is the right term? In the end, it's a point lookup
> >>>> for a key plus timestamps, so it's an as-of timestamp (not a bound)? Of
> >>>> course, the returned record would most likely have a different (smaller)
> >>>> timestamp, but that's expected but does not make the passed in timestamp
> >>>> a "bound" IMHO?
> >>>>
> >>>>> single-key query with timestamp range
> >>>>> single-key all versions query
> >>>>
> >>>> Should we also add `withLowerTimeBound` and `withUpperTimeBound`
> >>>> (similar to what `RangeQuery` has)?
> >>>>
> >>>> Btw: I think we should not pass `long` for timestamps, but `Instance`
> >>>> types.
> >>>>
> >>>> For time-range queries, do we iterate over the values in timestamp
> >>>> ascending order? If yes, the interface should specify it? Also, would it
> >>>> make sense to add reverse order (also ok to exclude and only do if there
> >>>> is demand in a follow up KIP; if not, please add to "Rejected
> >>>> alternatives" section).
> >>>>
> >>>> Also, for time-range query, what are the exact bound for stuff we
> >>>> include? In the end, a value was a "valid range" (conceptually), so do
> >>>> we include a record if it's valid range overlaps the search time-range,
> >>>> or must it be fully included? Or would we only say, that the `validFrom`
> >>>> timestamp that is stored must be in the search range (what implies that
> >>>> the lower end would be a non-overlapping but "fully included" bound,
> >>>> while the upper end would be a overlapping bound).
> >>>>
> >>>> For key-range / time-range queries: do we return the result in `<k,ts>`
> >>>> order or `<ts,k>` order? Also, what about reverse iterators?
> >>>>
> >>>> About ` ValueIterator` -- think the JavaDocs have c&p error in it for
> >>>> `peekNextRecord` (also, should it be called `peekNextValue`? (Also some
> >>>> other JavaDocs seem to be incomplete and not describe all parameters?)
> >>>>
> >>>>
> >>>> Thanks.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 7/26/23 7:24 AM, Alieh Saeedi wrote:
> >>>>> Hi all,
> >>>>>
> >>>>> I would like to propose a KIP to support IQv2 for versioned state
> >>> stores.
> >>>>>
> >>>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores
> >>>>>
> >>>>> Looking forward to your feedback!
> >>>>>
> >>>>> Cheers,
> >>>>> Alieh
> >>>>>
> >>>
> 

Reply via email to