Hi John,

Thank you. I think it makes sense to modify the KIP to add the prefixScan()
as part of the existing interfaces and add the new mixin behaviour as
Rejected alternatives. I am not very aware of other stores apart from
keyValueStore so is it fine if I keep it there for now?

Regarding the type definition of types I will try and think about some
alternatives and share if I get any.

Thanks!
Sagar.


On Sun, May 31, 2020 at 1:55 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Sagar,
>
> Thanks for the response. Your use case makes sense to me; I figured it
> must be something like that.
>
> On a pragmatic level, in the near term, you might consider basically doing
> the same thing we did in KIP-213. If you swap out the store types for
> Byte/byte[] and “manually” invoke the serdes in your own logic, then you
> can use the same algorithm we did to derive the range scan boundaries from
> your desired prefix.
>
> For the actual KIP, it seems like we would need significant design
> improvements to be able to do any mixins, so I think we should favor
> proposing either to just add to the existing interfaces or to create brand
> new interfaces, as appropriate, for now. Given that prefix can be converted
> to a range query at a low level, I think we can probably explore adding
> prefix to the existing interfaces with a default implementation.
>
> It seems like that just leaves the question of how to define the type of
> the prefix. To be honest, I don’t have any great ideas here. Are you able
> to generate some creative solutions, Sagar?
>
> Thanks,
> John
>
> On Tue, May 26, 2020, at 06:42, Sagar wrote:
> > Hi John,
> >
> > Thanks for the detailed reply. I was a bit crammed with work last week so
> > couldn't respond earlier so apologies for that.
> >
> > First of all, thanks for the context that both you and Adam have
> > provided me on the issues faced previously. As I can clearly see, while I
> > was able to cut some corners while writing some test cases or benchmarks,
> > to be able to stitch together a store with prefix scan into an actual
> > topology needs more work. I am sorry for the half baked tests that I
> wrote
> > without realising and you have rightly put it when you said these
> > challenges aren't obvious up front.
> >
> > Now, coming back to the other points, I spent some time going through the
> > KIP-213 and also some of the code snippets that are talked about in that
> > KIP. With the detailed explanation that you provided, it is now obvious
> to
> > me that keeping a generic type for keys like K won't work oob and hence a
> > decision was made to use Bytes as the key type.
> >
> > I just had another thought on this though. I looked at the range function
> > that was added in the ReadOnlyKeyValueStore. While the Key and the Value
> > mentioned in that method is generic, internally almost all queries end up
> > querying using Bytes in some or the other form. I looked at not just
> > RocksDb Store but other stores like InMemory store or MemoryLRU and this
> > seems to be the pattern. I think this stems from the fact that these
> stores
> > while implementing KeyValueStore pass Bytes, byte[] as the K and V
> values.
> > Classes like MeteredKeyValueStore which don't do this, still use
> Bytes.wrap
> > to wrap the passed keys and values and invoke the range method.
> >
> > So, the point I am trying to make is, with the same behaviour - and
> > ignoring for a moment that it's a separate interface which I am trying to
> > "mix-in"- the issues with the key types could be resolved. I may be wrong
> > though so would like to know your thoughts on this. Infact unknowingly
> the
> > interface implementation of PrefixSeekableType in RockDBStateStore also
> > passes Bytes and bytes[] as K and V.
> >
> > The second part of exposing it via the publically accessible interfaces
> to
> > which we downcast while building the topology (like KeyValueStore), I can
> > clearly see now that mixing-in the way I tried to won't work. My
> intention
> > all along was not to hamper the flow of those stores which don't support
> > prefix scan as yet and hence the separate interface. But, I agree that
> for
> > this to work, it needs to be part of some pre-defined store types like
> > KVStore etc. Right now, I don't have an answer to this but mostly it
> would
> > have to be moved there and implemented across all stores(if we see the
> > worth in prefix scans :) )
> >
> > Regarding the motivation, I am sorry if I wasn't clear. This originated
> > from one of my own use cases with kafka streams where i needed to find
> some
> > keys based upon certain prefix. Infact it's similar to the
> > RangeScanCombinedKeyUsage diagram in KIP-213 where the otherTable tries
> to
> > find entries in the state store based upon the FK. I was using
> > KevValueStore to be precise. I also remember having a slack conversation
> on
> > this, and I was told that this isn't supported right now, but some other
> > users shared their experiences on how with some hacks they are able to
> > perform prefix scans even though their use case fits the bill for a
> prefix
> > scan. That kind of motivated me to take a stab at it. Unfortunately, I
> have
> > lost the slack chat because of some cleanup at the slack channel level. I
> > will try and update the ambiguous motivation statement in the near
> future.
> >
> > Lastly, I would like to point out, that your response was not at all
> > discouraging. On the contrary it was really insightful and it's always
> good
> > to learn/discover new things :)
> >
> > Thanks!
> > Sagar.
> >
> > On Fri, May 15, 2020 at 7:37 AM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Hi, Sagar!
> > >
> > > Thanks for this KIP. I'm sorry it took me so long to reply. I'll
> number my
> > > points differently to avoid confusion.
> > >
> > > I can provide some additional context on the difficulties we previously
> > > faced in KIP-213 (which you and Adam have already discussed).
> > >
> > > J1) In your KIP, you propose the following interface:
> > >
> > > public interface PrefixSeekableStore<K, V> {
> > >     KeyValueIterator<K, V> prefixSeek(K prefix);
> > > }
> > >
> > > This is roughly the same thing that Adam and I were considering
> > > before. It has a hidden problem, that it assumes that prefixes of
> > > keys in the key space are also in the key space. In other words, this
> > > is a store with key type K, and the API assumes that prefixes are also
> > > of type K. This is true for some key types, like String or Bytes, but
> not
> > > for others.
> > >
> > > For example, if the keys are UUIDs, then no prefix is also a UUID. If
> the
> > > key is a complex data type, like Windowed<K> in our own DSL, then
> > > we would absolutely want to query all keys with the same record key
> > > (the K part), or the same window start time, but in neither case is the
> > > prefix actually a Windowed<K>.
> > >
> > > You can skirt the issue by defining a third type parameter, maybe KP,
> that
> > > is the "prefix" type, but this would also be awkward for many usages.
> > >
> > > J2) There is a related problem with serialization. Whether something
> > > is a prefix or not depends not on the Java key (K), but on the binary
> > > format that is produced when you use a serde on the key. Whether
> > > we say that the prefix must also be a K or whether it gets its own
> type,
> > > KP, there are problems.
> > >
> > > In the latter case, we must additionally require a second set of serdes
> > > for the prefixes, but there's no obvious way to incorporate this in the
> > > API, especially not in the DSL.
> > >
> > > In either case, for the API to actually work, we need to know ahead
> > > of time that the Serde will produce a binary key that starts with the
> > > part that we wish to use as a prefix. For example, what we were doing
> > > briefly in KIP-213 (where we had complex keys, similar to Windowed<K>)
> > > was to define "dummy" values that indicate that a Windowed<K> is
> actually
> > > just a prefix key, not a real key. Maybe the window start time would be
> > > null or the key part would be null. But we also had to define a serde
> > > that would very specifically anticipate which component of the complex
> > > key would need to be used in a prefix key. Having to bring all these
> > > parts together in a reliable, easy-to-debug, fashion gives me some
> doubt
> > > that people would actually be able to use this feature in complicated
> > > programs without driving themselves crazy.
> > >
> > > J3) Thanks so much for including benchmarks and tests! Unfortunately,
> > > these don't include everything you need to really plug into the Streams
> > > API. I think when you push it a little farther, you'll realize what
> Adam
> > > was talking about wrt the interface difficulties.
> > >
> > > In your benchmark and tests, you directly construct the store and then
> > > use it, but in a real Streams application, you can only provide your
> > > implementation in a StoreSupplier, for example via the Materialized
> > > parameter. Then, to use the store from inside a Processor, you'd have
> > > to get it by name from the ProcessorContext, and then cast it to one of
> > > the pre-defined store types, KeyValueStore, WindowedStore, or
> > > SessionStore. It won't work to "mix in" your interface because the
> > > processor gets a store that's wrapped in layers that handle
> serialization,
> > > change-logging, recording metrics, and caching.
> > >
> > > To use the store through IQ, you have to provide a QueriableStoreType
> > > to KafkaStreams#store, and you get back a similarly wrapped store.
> > >
> > > I think our only choices to add an interface like yours is either to
> add
> > > it to one of the existing store types, like KeyValueStore or
> > > WindowedStore, or to define a completely new store hierarchy, meaning
> > > you have to duplicate all the "wrapper" layers in Streams.
> > >
> > > I think if you write an "end-to-end" test, where you write a Streams
> app,
> > > provide your store, and then use it in a Processor and through IQ,
> > > you'll see what I'm talking about.
> > >
> > > IIRC, those three points were the ones that ultimately led us to
> abandon
> > > the whole idea last time and just register the stores with key type
> Bytes.
> > > I think some creative solutions may yet be possible, but it'll take
> some
> > > more design work to get there.
> > >
> > > Can I ask what your motivation is, exactly, for proposing this feature?
> > > The motivation just says "some users may want to do it", which has
> > > the advantage that it's impossible to disagree with, but doesn't
> provide
> > > a lot of concrete detail ;)
> > >
> > > Specifically, what I'm wondering is whether you wanted to use this as
> > > part of a KayValue store, which might be a challenge, or whether you
> > > wanted to use it for more efficient scans in a WindowedStore, like
> > > Guozhang.
> > >
> > > Thanks again for the KIP! I hope my response isn't too discouraging;
> > > I just wanted to convey the challenges we faced last time, since they
> > > are all not obvious up front.
> > >
> > > Best regards,
> > > -John
> > >
> > >
> > > On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> > > > Whoops, I guess I didn't finish reading the KIP all the way to the
> end
> > > > earlier. Thanks
> > > > for including the link to the RocksDB PR in the KIP!
> > > >
> > > > I have one additional question about the proposal: do you plan to
> also
> > > add
> > > > this
> > > > prefix seek API to the dual column family iterators? These are used
> by
> > > > RocksDBTimestampedStore (which extends RocksDBStore), for example the
> > > > *RocksDBDualCFRangeIterator*
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > On Thu, May 14, 2020 at 10:50 AM Sagar <sagarmeansoc...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey @Adam,
> > > > >
> > > > > Thanks for sharing your experience with using prefix seek. I did
> look
> > > at
> > > > > your code for RocksDBPrefixIterator, infact I have repurposed that
> > > class
> > > > > itself since it wasn't being used else where. Regarding how I plan
> to
> > > > > expose them through-out the state stores, what I have tried to do
> is
> > > add it
> > > > > as a separate interface. So, basically, it is not at the same
> level as
> > > the
> > > > > *range function so to speak. The reason I did that is currently I
> feel
> > > not
> > > > > all state stores are a natural fit for prefix seek. As I mentioned
> in
> > > the
> > > > > KIP as well, the current equivalent to it could be
> > > BulkLoadingStore(not in
> > > > > terms of functionality but in terms of how it is also not
> implemented
> > > by
> > > > > all of them). So, that ways I am not needing to stub them across
> all
> > > the
> > > > > state-stores and we can implement it only where needed. For
> example,
> > > in the
> > > > > PR that I have put for reference in the KIP, you can see that I
> have it
> > > > > implemented only for RocksDB.
> > > > >
> > > > > @Guozhang,
> > > > >
> > > > > Thanks for the feedback. Those are very interesting questions and I
> > > will
> > > > > try my best to answer based upon whatever limited understanding I
> have
> > > > > developed so far :)
> > > > >
> > > > > 1) Regarding the usage of useFixedLengthPrefixExtractor, honestly,
> I
> > > hadn't
> > > > > looked at that config. I did look it up after you pointed it out
> and
> > > seems
> > > > > it's more for hash-based memtables? I may be wrong though. But
> what I
> > > would
> > > > > say is that, the changes I had made were not exactly from a
> correctness
> > > > > stand point but more from trying to showcase how we can implement
> these
> > > > > changes. The idea was that once we see the merit in this approach
> then
> > > we
> > > > > can add some of the tunings( and I would need your team's
> assistance
> > > there
> > > > > :D).
> > > > >
> > > > > 2) Regarding the similarity of `RocksDBPrefixIterator` and
> > > > > `RocksDBRangeIterator`, yes the implementations look more or less
> > > similar.
> > > > > So, in terms of performance, they might be similar. But
> semantically,
> > > they
> > > > > can solve 2 different use-cases. The range seek is useful when we
> know
> > > both
> > > > > from and to. But if we consider use-cases where we want to find
> keys
> > > with a
> > > > > certain prefix, but we don't know if what it's start and end is,
> then
> > > > > prefix seek would come in more handy. The point that I am trying to
> > > make is
> > > > > that it can extend the scope of state stores from just point
> lookups to
> > > > > somewhat being able to speculative queries where by users can
> search
> > > if a
> > > > > certain pattern exists. I can vouch for this personally because I
> > > wanted to
> > > > > use state stores for one such use case and since this option wasn't
> > > there,
> > > > > I had to do some other things. An equivalent to this could be SCAN
> > > operator
> > > > > in Redis. (Not trying to compare the Redis and state stores but
> trying
> > > to
> > > > > give some context).
> > > > >
> > > > > Regarding the point on bloom filter, I think there are certain
> > > > > optimisations that are being talked about in case of prefix seek
> here:
> > > > >
> > > > >
> > > > >
> > >
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> > > > > Again
> > > > > this isn't something that I have explored fully. Also, on the
> prefix
> > > seek
> > > > > page on RocksDB they mention that there's a prefix iterating
> technique
> > > > > called Prefix Bloom Filter.
> > > > >
> > > > > 3) Regarding the question on length of bytes for seek v/s prefix
> seek,
> > > I am
> > > > > not entirely sure about that scenario. What I have understood is
> that
> > > > > at-least for Rocks DB, it is more performant for short iterator
> queries
> > > > > that longer ones.
> > > > >
> > > > > 4) Regarding the last question on placing it within Segment, the
> > > reason I
> > > > > didn't do that way, is that I thought we shouldn't tie this feature
> > > only to
> > > > > RocksDB. I agree that I got this idea while looking/reading about
> > > RocksDB
> > > > > but if we keep it outside the purview of RocksDB and keep it as a
> > > pluggable
> > > > > entity, then a) it remains generic by not being tied to any
> specific
> > > store
> > > > > and b) no change is needed at all for any of the other stores which
> > > haven't
> > > > > implemented it.
> > > > >
> > > > > I am not sure of any of the above points make sense but as I said,
> > > this is
> > > > > based out of my limited understanding of the codebase. So, pardon
> any
> > > > > incorrect/illogical statements plz!
> > > > >
> > > > > @Sophie,
> > > > >
> > > > > Thanks for bringing that point up! I have mentioned about that PR
> in
> > > the
> > > > > KIP under a section called Other considerations. Nonetheless,
> thanks
> > > for
> > > > > pointing it out!
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > >
> > > > > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Not to derail this KIP discussion, but to leave a few notes on
> some
> > > of
> > > > > the
> > > > > > RocksDB points that have come up:
> > > > > >
> > > > > > Someone actually merged some long overdue performance
> improvements to
> > > > > > the RocksJava implementation (the PR was opened back in 2017!
> yikes).
> > > > > > I haven't looked into the prefix seek API closely enough to know
> how
> > > > > > relevant
> > > > > > this particular change is, and they are still improving things,
> but
> > > it
> > > > > > gives me some
> > > > > > faith.
> > > > > >
> > > > > > There are some pretty promising results reported on the PR:
> > > > > >
> https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
> > > > > >
> > > > > > Regarding the custom comparator, they also recently merged this
> > > > > performance
> > > > > > <https://github.com/facebook/rocksdb/pull/6252>
> > > > > > improvement <https://github.com/facebook/rocksdb/pull/6252>. The
> > > tl;dr
> > > > > is
> > > > > > they reduced the slowdown of a custom comparator in Java
> > > > > > (relative to the native C++) from ~7x to ~5.2x at best. Which is
> > > still
> > > > > not
> > > > > > great, but it
> > > > > > would be interesting to run our own benchmarks and see how this
> > > stacks
> > > > > up.
> > > > > >
> > > > > > Of course, these are all new changes and as such will require us
> to
> > > > > upgrade
> > > > > > rocks to 6.x which means they have to wait for us to release a
> 3.0.
> > > But
> > > > > > there's
> > > > > > some talk about 3.0 coming in the next few releases so consider
> it
> > > food
> > > > > for
> > > > > > not-so-future thought
> > > > > >
> > > > > >
> > > > > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <
> > > adam.bellem...@gmail.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang
> > > > > > >
> > > > > > > For clarity, the issues I was running into was not about the
> actual
> > > > > > > *prefixSeek* function itself, but about exposing it to the same
> > > level
> > > > > of
> > > > > > > access as the *range* function throughout Kafka Streams. It
> > > required a
> > > > > > lot
> > > > > > > of changes, and also required that most state stores stub it
> out
> > > since
> > > > > it
> > > > > > > wasn't clear how they would implement it. It was basically an
> > > > > > overreaching
> > > > > > > API change that was easily solved (for the specific
> prefix-scan in
> > > FKJ)
> > > > > > by
> > > > > > > simply using *range*. So to be clear, the blockers were
> > > predominantly
> > > > > > > around correctly handling the API changes, nothing to do with
> the
> > > > > > > mechanisms of the RocksDB prefix scanning.
> > > > > > >
> > > > > > > As for KAFKA-5285 I'll look into it more to see if I can get a
> > > better
> > > > > > > handle on the problem!
> > > > > > >
> > > > > > > Hope this helps clear it up.
> > > > > > >
> > > > > > > Adam
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Adam,
> > > > > > > >
> > > > > > > > I'm wondering if you can provide a bit more context on the
> > > blockers
> > > > > of
> > > > > > > > using prefixSeek of RocksDB (I saw you have a
> > > RocksDBPrefixIterator
> > > > > > class
> > > > > > > > but not used anywhere yet)? I'm currently looking at ways to
> > > allow
> > > > > some
> > > > > > > > secondary indices with rocksDB following some existing
> approaches
> > > > > > > > from CockroachDB etc so I'm very curious to learn your
> > > experience.
> > > > > > > >
> > > > > > > > 1) Before considering any secondary indices, a quick thought
> is
> > > that
> > > > > > for
> > > > > > > > (key, timeFrom, timeTo) queries, we can easily replace the
> > > current
> > > > > > > > `range()` impl with a `prefixRange()` impl via a prefix
> iterator;
> > > > > > though
> > > > > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much more
> > > complicated
> > > > > > indeed
> > > > > > > > and hence existing `range()` impl may still be used.
> > > > > > > >
> > > > > > > > 2) Another related issue I've been pondering for a while is
> > > > > > > > around KAFKA-5285: with the default lexicograpic byte
> comparator,
> > > > > since
> > > > > > > the
> > > > > > > > key length varies, the combo (key, window) would have
> > > interleaving
> > > > > byte
> > > > > > > > layouts like:
> > > > > > > >
> > > > > > > > AAA0001          (key AAA, timestamp 0001)
> > > > > > > > AAA00011        (key AAA0, timestamp 0011)
> > > > > > > > AAA0002          (key AAA, timestamp 0002)
> > > > > > > >
> > > > > > > > which is challenging for prefix seeks to work efficiently.
> > > Although
> > > > > we
> > > > > > > can
> > > > > > > > overwrite the byte-comparator in JNI it is very expensive
> and the
> > > > > cost
> > > > > > of
> > > > > > > > JNI overwhelms its benefits. If you've got some ideas around
> it
> > > > > please
> > > > > > > lmk
> > > > > > > > as well.
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <
> > > > > > adam.bellem...@gmail.com
> > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Sagar
> > > > > > > > >
> > > > > > > > > I implemented a very similar interface for KIP-213, the
> > > foreign-key
> > > > > > > > joiner.
> > > > > > > > > We pulled it out of the final implementation and instead
> used
> > > > > RocksDB
> > > > > > > > range
> > > > > > > > > instead. You can see the particular code where we use
> > > > > > > RocksDB.range(...)
> > > > > > > > to
> > > > > > > > > get the same iterator result.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > > > > > > > >
> > > > > > > > > We pulled it out because there were numerous awkward
> > > acrobatics to
> > > > > > > > > integrate *prefixSeek()* function into the Kafka Streams
> code.
> > > > > > > > Basically, I
> > > > > > > > > wanted to be able to access *prefixSeek()* the same way I
> can
> > > > > access
> > > > > > > > > *range()* for any state store, and in particular use it for
> > > storing
> > > > > > > data
> > > > > > > > > with a particular foreign key (as per the previous URL).
> > > However, I
> > > > > > > found
> > > > > > > > > out that it required way too many changes to expose the
> > > > > > *prefixSeek()*
> > > > > > > > > functionality while still being able to leverage all the
> nice
> > > Kafka
> > > > > > > > Streams
> > > > > > > > > state management + supplier functionality, so we made a
> > > decision
> > > > > just
> > > > > > > to
> > > > > > > > > stick with *range()* and pull everything else out.
> > > > > > > > >
> > > > > > > > > I guess my question here is, how do you anticipate using
> > > > > > *prefixSeek()*
> > > > > > > > > within the framework of Kafka Streams, or the Processor
> API?
> > > > > > > > >
> > > > > > > > > Adam
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar <
> > > sagarmeansoc...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi All,
> > > > > > > > > >
> > > > > > > > > > I would like to start a discussion on the KIP that I
> created
> > > > > below
> > > > > > to
> > > > > > > > add
> > > > > > > > > > prefix scan support in State Stores:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > Sagar.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to