Hi Patrick and Sagar, Thanks for the feedback! I'll just break out the questions and address them one at a time.
Patrick 1. The default bound that I'm proposing is only to let active tasks answer queries (which is also the default with IQ today). Therefore, calling getPositionBound() would return a PositionBound for which isLatest() is true. Patrick 2. I might have missed something in revision, but I'm not sure what you're referring to exactly when you say they are different. The IQRequest only has a PositionBound, and the IQResponse only has a (concrete) Position, so I think they are named accordingly (getPositionBound and getPosition). Am I overlooking what you are talking about? Sagar 1. I think you're talking about the KeyValueStore#get(key) method? This is a really good question. I went ahead and dropped in an addendum to the KeyQuery example to show how you would run the query in today's API. Here's a caracature of the two APIS: current: KeyValueStore store = kafkaStreams.store( "mystore", keyValueStore()) int value = store.get(key); proposed: int value = kafkaStreams.query( "mystore", KeyQuery.withKey(key)); So, today we first get the store interface and then we invoke the method, and under the proposal, we would instead just ask KafkaStreams to execute the query on the store. In addition to all the other stuff I said in the motivation, one thing I think is neat about this API is that it means we can re-use queries across stores. So, for example, we could also use KeyQuery on WindowStores, even though there's no common interface between WindowStore and KeyValueStore. In other words, stores can support any queries that make sense and _not_ support any queries that don't make sense. This gets into your second question... Sagar 2. Very good question. Your experience with your KIP-614 contribution was one of the things that made me want to revise IQ to begin with. It seems like there's a really stark gap between how straightforward the proposal is to add a new store operation, and then how hard it is to actually implement a new operation, due to all those intervening wrappers. There are two categories of wrappers to worry about: - Facades: These only exist to disallow access to write APIs, which are exposed through IQ today but shouldn't be called. These are simply unnecessary under IQv2, since we only run queries instead of returning the whole store. - Store Layers: This is what you provided examples of. We have store layers that let us compose features like de/serialization and metering, changelogging, caching, etc. A nice thing about this design is that we mostly don't have to worry at all about those wrapper layers at all. Each of these stores would simply delegate any query to lower layers unless there is something they need to do. In my POC, I simply added a delegating implementation to WrappedStateStore, which meant that I didn't need to touch most of the wrappers when I added a new query. Here's what I think future contributors will have to worry about: 1. The basic query execution in the base byte stores (RocksDB and InMemory) 2. The Caching stores IF they want the query to be served from the cache 3. The Metered stores IF some serialization needs to be done for the query And that's it! We should be able to add new queries without touching any other store layer besides those, and each one of those is involved because it has some specific reason to be. Thanks again, Patrick and Sagar! Please let me know if I failed to address your questions, or if you have any more. Thanks, -John On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote: > Hi John, > > Thanks for the great writeup! Couple of things I wanted to bring up(may or > mayn't be relevant): > > 1) The sample implementation that you have presented for KeyQuery is very > helpful. One thing which may be added to it is how it connects to the > KeyValue.get(key) method. That's something that atleast I couldn't totally > figure out-not sure about others though. I understand that it is out of > scope of th KIP to explain for every query that IQ supports but one > implementation just to get a sense of how the changes would feel like. > 2) The other thing that I wanted to know is that StateStore on it's own has > a lot of implementations and some of them are wrappers, So at what levels > do users need to implement the query methods? Like a MeteredKeyValueStore > wraps RocksDbStore and calls it internally through a wrapped call. As per > the new changes, how would the scheme of things look like for the same > KeyQuery? > > Thanks! > Sagar. > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi <pstu...@confluent.io.invalid> > wrote: > > > Hi John, > > > > Thanks for submitting the KIP! One question I have is, assuming one > > instantiates InteractiveQueryRequest via withQuery, and then later calls > > getPositionBound, what will the result be? Also I noticed the Position > > returning method is in InteractiveQueryRequest and InteractiveQueryResult > > is named differently, any particular reason? > > > > Best, > > Patrick > > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vvcep...@apache.org> wrote: > > > > > Thanks for taking a look, Sophie! > > > > > > Ah, that was a revision error. I had initially been planning > > > an Optional<Set<Integer>> with Optional.empty() meaning to > > > fetch all partitions, but then decided it was needlessly > > > complex and changed it to the current proposal with two > > > methods: > > > > > > boolean isAllPartitions(); > > > Set<Integer> getPartitions(); (which would throw an > > > exception if it's an "all partitions" request). > > > > > > I've corrected the javadoc and also documented the > > > exception. > > > > > > Thanks! > > > -John > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman > > > wrote: > > > > Thanks John, I've been looking forward to this for a while now. It > > > > was pretty horrifying to learn > > > > how present-day IQ works (or rather, doesn't work) with custom state > > > > stores :/ > > > > > > > > One minor cosmetic point, In the InteractiveQueryRequest class, the # > > > > getPartitions > > > > method has a return type of Set<Integer>, but the javadocs refer to > > > Optional. > > > > Not > > > > sure which is intended for this API, but if is supposed to be the > > return > > > > type, do you perhaps > > > > mean for it to be Optional.ofEmpty() and Optional.of(non-empty set) > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ? > > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > Hello again, all, > > > > > > > > > > Just bumping this discussion on a new, more flexible > > > > > Interactive Query API in Kafka Streams. > > > > > > > > > > If there are no concerns, I'll go ahead and call a vote on > > > > > Monday. > > > > > > > > > > Thanks! > > > > > -John > > > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote: > > > > > > Hello all, > > > > > > > > > > > > I'd like to start the discussion for KIP-796, which proposes > > > > > > a revamp of the Interactive Query APIs in Kafka Streams. > > > > > > > > > > > > The proposal is here: > > > > > > https://cwiki.apache.org/confluence/x/34xnCw > > > > > > > > > > > > I look forward to your feedback! > > > > > > > > > > > > Thank you, > > > > > > -John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >