Hello John,

Great, great, great writeup! :) And thank you for bringing this up finally.
I have made a pass on the KIP as well as the POC PR of it, here are some
initial thoughts:

First are some meta ones:

1. Today the serdes do not only happen at the metered-store layer,
unfortunately. For windowed / sessioned stores, and also some newly added
ones for stream-stream joins that are optimized for time-based range
queries, for example, the serdes are actually composite at multiple layers.
And the queries on the outer interface are also translated with serde
wrapped / stripped along the way in layers. To be more specific, today our
store hierarchy is like this:

metered * -> cached -> logged * -> formatted * (e.g. segmenged,
list-valued) -> inner (rocksdb, in-memory)

and serdes today could happen on the layers with * above, where each layer
is stuffing a bit more as prefix/suffix into the query bytes. This is not
really by design or ideal, but a result of history accumulated tech debts..
There's a related JIRA ticket for it:
https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is that
we need to be a bit careful regarding how to implement the
`KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy roads
moving forward.

2. Related to 1 above, I think we cannot always delegate the `query()`
implementation to the `inner` store layer, since some serde, or even some
computation logic happens at the outer, especially the `formatted` layer.
For example, besides the cached layer, the `formatted` layer also needs to
make sure the `query` object is being appropriately translated before
handing it off downstreams to the inner store, and also need to translate
the `queryResult` a bit while handing it upwards in the hierarchy.

3. As we add more query types in the IQv2, the inner store's `query`
instantiation may be getting very clumsy with a large "switch" condition on
all the possible query types. Although custom stores could consider only
supporting a few, having the `default` case to ignore all others, built-in
stores may still need to exhaust all possible types. I'm wondering if it's
a good trade-off to make `Query` be more restricted on extensibility to
have less exploding query type space, e.g. if a Query can only be extended
with some predefined dimensions like:

* query-field: key, non-key (some field extractor from the value bytes need
to be provided)
* query-scope: single, range
* query-match-type (only be useful for a range scope): prefix-match (e.g.
for a range key query, the provided is only a prefix, and all keys
containing this prefix should be returned), full-match
* query-value-type: object, raw-bytes

4. What's the expected usage for the execution info? Is it only for logging
purposes? If yes then I think not enforcing any string format is fine, that
the store layers can just attach any string information that they feel
useful.

5. I do not find any specific proposals for exception handling, what would
that look like? E.g. besides all the expected error cases like non-active,
how would we communicate other unexpected error cases such as store closed,
IO error, bad query parameters, etc?

6. Since we do not deprecate any existing APIs in this KIP, it's a bit hard
for readers to understand what is eventually going to be covered by IQv2.
For example, we know that eventually `KafkaStreams#store` would be gone,
but what about `KafkaStreams#queryMetadataForKey`, and
`#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I think
it would be great to mention the end world state with IQv2 even if the KIP
itself would not deprecate anything yet.

7. It seems people are still a bit confused about the
"Position/PositionBound" topics, and personally I think it's okay to
exclude them in this KIP just to keep its (already large) scope smaller.
Also after we started implementing the KIP in full, we may have learned new
things while fighting the details in the weeds, and that would be a better
timing for us to consider new parameters such as bounds, but also caching
bypassing, and other potential features as well.

Some minor ones:

8. What about just naming the new classes as `StateQueryRequest/Result`, or
`StoreQueryRequest/Result`? The word "interactive" is for describing its
semantics in docs, but I feel for class names we can use a more meaningful
prefix.

9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or directly
implementing `Query<byte[]`>?

10. Why do we need the new class "InteractiveQuerySerdes" along with
existing classes? In your PR it seems just using `StateSerdes` directly.

11. Why do we have a new template type "R" in the QueryResult class in
addition to "<K, V>"? Should R always be equal to V?

12. Related to 10/11 above, what about letting the QueryResult to always be
returning values in raw bytes, along with the serdes? And then it's up to
the callers whether they want the bytes to be immediately deserialized or
want them to be written somewhere and deserialized later? More specifically
we would only have a single function as KafkaStreams#query, and the
QueryResult would be:

InteractiveQueryResult {
  public InteractiveQueryResult(Map<Integer /*partition*/,
QueryResult<byte[]>> partitionResults);

...

  public StateSerdes<K, V> serdes();
}

And then the result itself can also provide some built-in functions to do
the deser upon returning results, so that user's code would not get more
complicated. The benefit is that we end up with a single function in
`KafkaStreams`, and the inner store always only need to implement the raw
query types. Of course doing this would not be so easy given the fact
described in 1) above, but I feel this would be a good way to first
abstract away this tech debt, and then later resolve it to a single place.

---------------

Again, congrats on the very nice proposal! Let me know what you think about
my comments.

Guozhang


On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Patrick and Sagar,
>
> Thanks for the feedback! I'll just break out the questions
> and address them one at a time.
>
> Patrick 1.
> The default bound that I'm proposing is only to let active
> tasks answer queries (which is also the default with IQ
> today). Therefore, calling getPositionBound() would return a
> PositionBound for which isLatest() is true.
>
> Patrick 2.
> I might have missed something in revision, but I'm not sure
> what you're referring to exactly when you say they are
> different. The IQRequest only has a PositionBound, and the
> IQResponse only has a (concrete) Position, so I think they
> are named accordingly (getPositionBound and getPosition). Am
> I overlooking what you are talking about?
>
> Sagar 1.
> I think you're talking about the KeyValueStore#get(key)
> method? This is a really good question. I went ahead and
> dropped in an addendum to the KeyQuery example to show how
> you would run the query in today's API. Here's a caracature
> of the two APIS:
>
> current:
>   KeyValueStore store = kafkaStreams.store(
>     "mystore",
>     keyValueStore())
>   int value = store.get(key);
>
> proposed:
>   int value = kafkaStreams.query(
>     "mystore",
>     KeyQuery.withKey(key));
>
> So, today we first get the store interface and then we
> invoke the method, and under the proposal, we would instead
> just ask KafkaStreams to execute the query on the store. In
> addition to all the other stuff I said in the motivation,
> one thing I think is neat about this API is that it means we
> can re-use queries across stores. So, for example, we could
> also use KeyQuery on WindowStores, even though there's no
> common interface between WindowStore and KeyValueStore.
>
> In other words, stores can support any queries that make
> sense and _not_ support any queries that don't make sense.
> This gets into your second question...
>
> Sagar 2.
> Very good question. Your experience with your KIP-614
> contribution was one of the things that made me want to
> revise IQ to begin with. It seems like there's a really
> stark gap between how straightforward the proposal is to add
> a new store operation, and then how hard it is to actually
> implement a new operation, due to all those intervening
> wrappers.
>
> There are two categories of wrappers to worry about:
> - Facades: These only exist to disallow access to write
> APIs, which are exposed through IQ today but shouldn't be
> called. These are simply unnecessary under IQv2, since we
> only run queries instead of returning the whole store.
> - Store Layers: This is what you provided examples of. We
> have store layers that let us compose features like
> de/serialization and metering, changelogging, caching, etc.
> A nice thing about this design is that we mostly don't have
> to worry at all about those wrapper layers at all. Each of
> these stores would simply delegate any query to lower layers
> unless there is something they need to do. In my POC, I
> simply added a delegating implementation to
> WrappedStateStore, which meant that I didn't need to touch
> most of the wrappers when I added a new query.
>
> Here's what I think future contributors will have to worry
> about:
> 1. The basic query execution in the base byte stores
> (RocksDB and InMemory)
> 2. The Caching stores IF they want the query to be served
> from the cache
> 3. The Metered stores IF some serialization needs to be done
> for the query
>
> And that's it! We should be able to add new queries without
> touching any other store layer besides those, and each one
> of those is involved because it has some specific reason to
> be.
>
>
> Thanks again, Patrick and Sagar! Please let me know if I
> failed to address your questions, or if you have any more.
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > Hi John,
> >
> > Thanks for the great writeup! Couple of things I wanted to bring up(may
> or
> > mayn't be relevant):
> >
> > 1) The sample implementation that you have presented for KeyQuery is very
> > helpful. One thing which may be added to it is how it connects to the
> > KeyValue.get(key) method. That's something that atleast I couldn't
> totally
> > figure out-not sure about others though. I understand that it is out of
> > scope of th KIP to explain for every query that IQ supports but one
> > implementation just to get a sense of how the changes would feel like.
> > 2) The other thing that I wanted to know is that StateStore on it's own
> has
> > a lot of implementations and some of them are wrappers, So at what levels
> > do users need to implement the query methods? Like a MeteredKeyValueStore
> > wraps RocksDbStore and calls it internally through a wrapped call. As per
> > the new changes, how would the scheme of things look like for the same
> > KeyQuery?
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> <pstu...@confluent.io.invalid>
> > wrote:
> >
> > > Hi John,
> > >
> > > Thanks for submitting the KIP! One question I have is, assuming one
> > > instantiates InteractiveQueryRequest via withQuery, and then later
> calls
> > > getPositionBound, what will the result be? Also I noticed the Position
> > > returning method is in InteractiveQueryRequest and
> InteractiveQueryResult
> > > is named differently, any particular reason?
> > >
> > > Best,
> > >   Patrick
> > >
> > >
> > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vvcep...@apache.org>
> wrote:
> > >
> > > > Thanks for taking a look, Sophie!
> > > >
> > > > Ah, that was a revision error. I had initially been planning
> > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > fetch all partitions, but then decided it was needlessly
> > > > complex and changed it to the current proposal with two
> > > > methods:
> > > >
> > > > boolean isAllPartitions();
> > > > Set<Integer> getPartitions(); (which would throw an
> > > > exception if it's an "all partitions" request).
> > > >
> > > > I've corrected the javadoc and also documented the
> > > > exception.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > wrote:
> > > > > Thanks John, I've been looking forward to this for a while now. It
> > > > > was pretty horrifying to learn
> > > > > how present-day IQ works  (or rather, doesn't work) with custom
> state
> > > > > stores :/
> > > > >
> > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> the #
> > > > > getPartitions
> > > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > > Optional.
> > > > > Not
> > > > > sure which is intended for this API, but if is supposed to be the
> > > return
> > > > > type, do you perhaps
> > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> set)
> > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > >
> > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcep...@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Hello again, all,
> > > > > >
> > > > > > Just bumping this discussion on a new, more flexible
> > > > > > Interactive Query API in Kafka Streams.
> > > > > >
> > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > Monday.
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > Hello all,
> > > > > > >
> > > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > >
> > > > > > > The proposal is here:
> > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > >
> > > > > > > I look forward to your feedback!
> > > > > > >
> > > > > > > Thank you,
> > > > > > > -John
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > >
>
>

-- 
-- Guozhang

Reply via email to