Hi John,

Thanks for the response. For starters, for our use case, we tweaked our
keys etc to avoid prefix scans. So, we are good there.

Regarding the KIP, I see what you mean when you say that the same key type
for prefix won't work. For example, continuing with the UUID example that
you gave, let's say one of the UUIDs
is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want to
fetch all keys starting with 123. There's already a UUIDSerde so if the
keys have been stored with that, then using UUIDSerde for prefixes won't
help- I am not sure if the UUIDSerializer would even work for 123.

So, that indicates that we will need to provide a new prefix key type
serializer. Having said that, how it will be stitched together and finally
exposed using the APIs is something that is up for questioning. This is
something you have also brought up in the earlier emails. If it
makes sense, I can modify my PR to go along these lines. Please let me know
what you think.

Lastly, I didn't understand this line of yours: *It might help if there are
other typed key/value stores to compare APIs with.*

Thanks!
Sagar.


On Thu, Jun 4, 2020 at 6:03 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Sagar,
>
> Thanks for the question, and sorry for muddying the water.
>
> I meant the Bytes/byte[] thing as advice for how you all can solve your
> problem in the mean time, while we work through this KIP. I don’t think
> it’s relevant for the KIP itself.
>
> I think the big issue here is what the type of the prefix should be in the
> method signature. Using the same type as the key makes sense some times,
> but not other times. I’m not sure what the best way around this might be.
> It might help if there are other typed key/value stores to compare APIs
> with.
>
> Thanks,
> John
>
> On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> > Hi John,
> >
> > Just to add to my previous email(and sorry for the spam), if we consider
> > using Bytes/byte[] and manually invoke the serdes, if you could provide
> > examples where the same Serde for keys won't work for the prefix types.
> As
> > far as my understanding goes, the prefix seek would depend upon ordering
> of
> > the keys like lexicographic. As long as the binary format is consistent
> for
> > both the keys and the prefixes would it not ensure the ability to search
> in
> > that same ordering space? This is from my limited understanding so any
> > concrete examples would be helpful...
> >
> > Also, you mentioned about the creation of dummy values to indicate prefix
> > values, do you mean this line:
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
> > This
> > is where the prefix key is built and used for searching .
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jun 1, 2020 at 11:42 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > > Hi John,
> > >
> > > Thank you. I think it makes sense to modify the KIP to add the
> > > prefixScan() as part of the existing interfaces and add the new mixin
> > > behaviour as Rejected alternatives. I am not very aware of other stores
> > > apart from keyValueStore so is it fine if I keep it there for now?
> > >
> > > Regarding the type definition of types I will try and think about some
> > > alternatives and share if I get any.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Sun, May 31, 2020 at 1:55 AM John Roesler <vvcep...@apache.org>
> wrote:
> > >
> > >> Hi Sagar,
> > >>
> > >> Thanks for the response. Your use case makes sense to me; I figured it
> > >> must be something like that.
> > >>
> > >> On a pragmatic level, in the near term, you might consider basically
> > >> doing the same thing we did in KIP-213. If you swap out the store
> types for
> > >> Byte/byte[] and “manually” invoke the serdes in your own logic, then
> you
> > >> can use the same algorithm we did to derive the range scan boundaries
> from
> > >> your desired prefix.
> > >>
> > >> For the actual KIP, it seems like we would need significant design
> > >> improvements to be able to do any mixins, so I think we should favor
> > >> proposing either to just add to the existing interfaces or to create
> brand
> > >> new interfaces, as appropriate, for now. Given that prefix can be
> converted
> > >> to a range query at a low level, I think we can probably explore
> adding
> > >> prefix to the existing interfaces with a default implementation.
> > >>
> > >> It seems like that just leaves the question of how to define the type
> of
> > >> the prefix. To be honest, I don’t have any great ideas here. Are you
> able
> > >> to generate some creative solutions, Sagar?
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Tue, May 26, 2020, at 06:42, Sagar wrote:
> > >> > Hi John,
> > >> >
> > >> > Thanks for the detailed reply. I was a bit crammed with work last
> week
> > >> so
> > >> > couldn't respond earlier so apologies for that.
> > >> >
> > >> > First of all, thanks for the context that both you and Adam have
> > >> > provided me on the issues faced previously. As I can clearly see,
> while
> > >> I
> > >> > was able to cut some corners while writing some test cases or
> > >> benchmarks,
> > >> > to be able to stitch together a store with prefix scan into an
> actual
> > >> > topology needs more work. I am sorry for the half baked tests that I
> > >> wrote
> > >> > without realising and you have rightly put it when you said these
> > >> > challenges aren't obvious up front.
> > >> >
> > >> > Now, coming back to the other points, I spent some time going
> through
> > >> the
> > >> > KIP-213 and also some of the code snippets that are talked about in
> that
> > >> > KIP. With the detailed explanation that you provided, it is now
> obvious
> > >> to
> > >> > me that keeping a generic type for keys like K won't work oob and
> hence
> > >> a
> > >> > decision was made to use Bytes as the key type.
> > >> >
> > >> > I just had another thought on this though. I looked at the range
> > >> function
> > >> > that was added in the ReadOnlyKeyValueStore. While the Key and the
> Value
> > >> > mentioned in that method is generic, internally almost all queries
> end
> > >> up
> > >> > querying using Bytes in some or the other form. I looked at not just
> > >> > RocksDb Store but other stores like InMemory store or MemoryLRU and
> this
> > >> > seems to be the pattern. I think this stems from the fact that these
> > >> stores
> > >> > while implementing KeyValueStore pass Bytes, byte[] as the K and V
> > >> values.
> > >> > Classes like MeteredKeyValueStore which don't do this, still use
> > >> Bytes.wrap
> > >> > to wrap the passed keys and values and invoke the range method.
> > >> >
> > >> > So, the point I am trying to make is, with the same behaviour - and
> > >> > ignoring for a moment that it's a separate interface which I am
> trying
> > >> to
> > >> > "mix-in"- the issues with the key types could be resolved. I may be
> > >> wrong
> > >> > though so would like to know your thoughts on this. Infact
> unknowingly
> > >> the
> > >> > interface implementation of PrefixSeekableType in RockDBStateStore
> also
> > >> > passes Bytes and bytes[] as K and V.
> > >> >
> > >> > The second part of exposing it via the publically accessible
> interfaces
> > >> to
> > >> > which we downcast while building the topology (like KeyValueStore),
> I
> > >> can
> > >> > clearly see now that mixing-in the way I tried to won't work. My
> > >> intention
> > >> > all along was not to hamper the flow of those stores which don't
> support
> > >> > prefix scan as yet and hence the separate interface. But, I agree
> that
> > >> for
> > >> > this to work, it needs to be part of some pre-defined store types
> like
> > >> > KVStore etc. Right now, I don't have an answer to this but mostly it
> > >> would
> > >> > have to be moved there and implemented across all stores(if we see
> the
> > >> > worth in prefix scans :) )
> > >> >
> > >> > Regarding the motivation, I am sorry if I wasn't clear. This
> originated
> > >> > from one of my own use cases with kafka streams where i needed to
> find
> > >> some
> > >> > keys based upon certain prefix. Infact it's similar to the
> > >> > RangeScanCombinedKeyUsage diagram in KIP-213 where the otherTable
> tries
> > >> to
> > >> > find entries in the state store based upon the FK. I was using
> > >> > KevValueStore to be precise. I also remember having a slack
> > >> conversation on
> > >> > this, and I was told that this isn't supported right now, but some
> other
> > >> > users shared their experiences on how with some hacks they are able
> to
> > >> > perform prefix scans even though their use case fits the bill for a
> > >> prefix
> > >> > scan. That kind of motivated me to take a stab at it.
> Unfortunately, I
> > >> have
> > >> > lost the slack chat because of some cleanup at the slack channel
> level.
> > >> I
> > >> > will try and update the ambiguous motivation statement in the near
> > >> future.
> > >> >
> > >> > Lastly, I would like to point out, that your response was not at all
> > >> > discouraging. On the contrary it was really insightful and it's
> always
> > >> good
> > >> > to learn/discover new things :)
> > >> >
> > >> > Thanks!
> > >> > Sagar.
> > >> >
> > >> > On Fri, May 15, 2020 at 7:37 AM John Roesler <vvcep...@apache.org>
> > >> wrote:
> > >> >
> > >> > > Hi, Sagar!
> > >> > >
> > >> > > Thanks for this KIP. I'm sorry it took me so long to reply. I'll
> > >> number my
> > >> > > points differently to avoid confusion.
> > >> > >
> > >> > > I can provide some additional context on the difficulties we
> > >> previously
> > >> > > faced in KIP-213 (which you and Adam have already discussed).
> > >> > >
> > >> > > J1) In your KIP, you propose the following interface:
> > >> > >
> > >> > > public interface PrefixSeekableStore<K, V> {
> > >> > >     KeyValueIterator<K, V> prefixSeek(K prefix);
> > >> > > }
> > >> > >
> > >> > > This is roughly the same thing that Adam and I were considering
> > >> > > before. It has a hidden problem, that it assumes that prefixes of
> > >> > > keys in the key space are also in the key space. In other words,
> this
> > >> > > is a store with key type K, and the API assumes that prefixes are
> also
> > >> > > of type K. This is true for some key types, like String or Bytes,
> but
> > >> not
> > >> > > for others.
> > >> > >
> > >> > > For example, if the keys are UUIDs, then no prefix is also a
> UUID. If
> > >> the
> > >> > > key is a complex data type, like Windowed<K> in our own DSL, then
> > >> > > we would absolutely want to query all keys with the same record
> key
> > >> > > (the K part), or the same window start time, but in neither case
> is
> > >> the
> > >> > > prefix actually a Windowed<K>.
> > >> > >
> > >> > > You can skirt the issue by defining a third type parameter, maybe
> KP,
> > >> that
> > >> > > is the "prefix" type, but this would also be awkward for many
> usages.
> > >> > >
> > >> > > J2) There is a related problem with serialization. Whether
> something
> > >> > > is a prefix or not depends not on the Java key (K), but on the
> binary
> > >> > > format that is produced when you use a serde on the key. Whether
> > >> > > we say that the prefix must also be a K or whether it gets its own
> > >> type,
> > >> > > KP, there are problems.
> > >> > >
> > >> > > In the latter case, we must additionally require a second set of
> > >> serdes
> > >> > > for the prefixes, but there's no obvious way to incorporate this
> in
> > >> the
> > >> > > API, especially not in the DSL.
> > >> > >
> > >> > > In either case, for the API to actually work, we need to know
> ahead
> > >> > > of time that the Serde will produce a binary key that starts with
> the
> > >> > > part that we wish to use as a prefix. For example, what we were
> doing
> > >> > > briefly in KIP-213 (where we had complex keys, similar to
> Windowed<K>)
> > >> > > was to define "dummy" values that indicate that a Windowed<K> is
> > >> actually
> > >> > > just a prefix key, not a real key. Maybe the window start time
> would
> > >> be
> > >> > > null or the key part would be null. But we also had to define a
> serde
> > >> > > that would very specifically anticipate which component of the
> complex
> > >> > > key would need to be used in a prefix key. Having to bring all
> these
> > >> > > parts together in a reliable, easy-to-debug, fashion gives me some
> > >> doubt
> > >> > > that people would actually be able to use this feature in
> complicated
> > >> > > programs without driving themselves crazy.
> > >> > >
> > >> > > J3) Thanks so much for including benchmarks and tests!
> Unfortunately,
> > >> > > these don't include everything you need to really plug into the
> > >> Streams
> > >> > > API. I think when you push it a little farther, you'll realize
> what
> > >> Adam
> > >> > > was talking about wrt the interface difficulties.
> > >> > >
> > >> > > In your benchmark and tests, you directly construct the store and
> then
> > >> > > use it, but in a real Streams application, you can only provide
> your
> > >> > > implementation in a StoreSupplier, for example via the
> Materialized
> > >> > > parameter. Then, to use the store from inside a Processor, you'd
> have
> > >> > > to get it by name from the ProcessorContext, and then cast it to
> one
> > >> of
> > >> > > the pre-defined store types, KeyValueStore, WindowedStore, or
> > >> > > SessionStore. It won't work to "mix in" your interface because the
> > >> > > processor gets a store that's wrapped in layers that handle
> > >> serialization,
> > >> > > change-logging, recording metrics, and caching.
> > >> > >
> > >> > > To use the store through IQ, you have to provide a
> QueriableStoreType
> > >> > > to KafkaStreams#store, and you get back a similarly wrapped store.
> > >> > >
> > >> > > I think our only choices to add an interface like yours is either
> to
> > >> add
> > >> > > it to one of the existing store types, like KeyValueStore or
> > >> > > WindowedStore, or to define a completely new store hierarchy,
> meaning
> > >> > > you have to duplicate all the "wrapper" layers in Streams.
> > >> > >
> > >> > > I think if you write an "end-to-end" test, where you write a
> Streams
> > >> app,
> > >> > > provide your store, and then use it in a Processor and through IQ,
> > >> > > you'll see what I'm talking about.
> > >> > >
> > >> > > IIRC, those three points were the ones that ultimately led us to
> > >> abandon
> > >> > > the whole idea last time and just register the stores with key
> type
> > >> Bytes.
> > >> > > I think some creative solutions may yet be possible, but it'll
> take
> > >> some
> > >> > > more design work to get there.
> > >> > >
> > >> > > Can I ask what your motivation is, exactly, for proposing this
> > >> feature?
> > >> > > The motivation just says "some users may want to do it", which has
> > >> > > the advantage that it's impossible to disagree with, but doesn't
> > >> provide
> > >> > > a lot of concrete detail ;)
> > >> > >
> > >> > > Specifically, what I'm wondering is whether you wanted to use
> this as
> > >> > > part of a KayValue store, which might be a challenge, or whether
> you
> > >> > > wanted to use it for more efficient scans in a WindowedStore, like
> > >> > > Guozhang.
> > >> > >
> > >> > > Thanks again for the KIP! I hope my response isn't too
> discouraging;
> > >> > > I just wanted to convey the challenges we faced last time, since
> they
> > >> > > are all not obvious up front.
> > >> > >
> > >> > > Best regards,
> > >> > > -John
> > >> > >
> > >> > >
> > >> > > On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> > >> > > > Whoops, I guess I didn't finish reading the KIP all the way to
> the
> > >> end
> > >> > > > earlier. Thanks
> > >> > > > for including the link to the RocksDB PR in the KIP!
> > >> > > >
> > >> > > > I have one additional question about the proposal: do you plan
> to
> > >> also
> > >> > > add
> > >> > > > this
> > >> > > > prefix seek API to the dual column family iterators? These are
> used
> > >> by
> > >> > > > RocksDBTimestampedStore (which extends RocksDBStore), for
> example
> > >> the
> > >> > > > *RocksDBDualCFRangeIterator*
> > >> > > >
> > >> > > > Thanks for the KIP!
> > >> > > >
> > >> > > > On Thu, May 14, 2020 at 10:50 AM Sagar <
> sagarmeansoc...@gmail.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hey @Adam,
> > >> > > > >
> > >> > > > > Thanks for sharing your experience with using prefix seek. I
> did
> > >> look
> > >> > > at
> > >> > > > > your code for RocksDBPrefixIterator, infact I have repurposed
> that
> > >> > > class
> > >> > > > > itself since it wasn't being used else where. Regarding how I
> > >> plan to
> > >> > > > > expose them through-out the state stores, what I have tried
> to do
> > >> is
> > >> > > add it
> > >> > > > > as a separate interface. So, basically, it is not at the same
> > >> level as
> > >> > > the
> > >> > > > > *range function so to speak. The reason I did that is
> currently I
> > >> feel
> > >> > > not
> > >> > > > > all state stores are a natural fit for prefix seek. As I
> > >> mentioned in
> > >> > > the
> > >> > > > > KIP as well, the current equivalent to it could be
> > >> > > BulkLoadingStore(not in
> > >> > > > > terms of functionality but in terms of how it is also not
> > >> implemented
> > >> > > by
> > >> > > > > all of them). So, that ways I am not needing to stub them
> across
> > >> all
> > >> > > the
> > >> > > > > state-stores and we can implement it only where needed. For
> > >> example,
> > >> > > in the
> > >> > > > > PR that I have put for reference in the KIP, you can see that
> I
> > >> have it
> > >> > > > > implemented only for RocksDB.
> > >> > > > >
> > >> > > > > @Guozhang,
> > >> > > > >
> > >> > > > > Thanks for the feedback. Those are very interesting questions
> and
> > >> I
> > >> > > will
> > >> > > > > try my best to answer based upon whatever limited
> understanding I
> > >> have
> > >> > > > > developed so far :)
> > >> > > > >
> > >> > > > > 1) Regarding the usage of useFixedLengthPrefixExtractor,
> > >> honestly, I
> > >> > > hadn't
> > >> > > > > looked at that config. I did look it up after you pointed it
> out
> > >> and
> > >> > > seems
> > >> > > > > it's more for hash-based memtables? I may be wrong though. But
> > >> what I
> > >> > > would
> > >> > > > > say is that, the changes I had made were not exactly from a
> > >> correctness
> > >> > > > > stand point but more from trying to showcase how we can
> implement
> > >> these
> > >> > > > > changes. The idea was that once we see the merit in this
> approach
> > >> then
> > >> > > we
> > >> > > > > can add some of the tunings( and I would need your team's
> > >> assistance
> > >> > > there
> > >> > > > > :D).
> > >> > > > >
> > >> > > > > 2) Regarding the similarity of `RocksDBPrefixIterator` and
> > >> > > > > `RocksDBRangeIterator`, yes the implementations look more or
> less
> > >> > > similar.
> > >> > > > > So, in terms of performance, they might be similar. But
> > >> semantically,
> > >> > > they
> > >> > > > > can solve 2 different use-cases. The range seek is useful
> when we
> > >> know
> > >> > > both
> > >> > > > > from and to. But if we consider use-cases where we want to
> find
> > >> keys
> > >> > > with a
> > >> > > > > certain prefix, but we don't know if what it's start and end
> is,
> > >> then
> > >> > > > > prefix seek would come in more handy. The point that I am
> trying
> > >> to
> > >> > > make is
> > >> > > > > that it can extend the scope of state stores from just point
> > >> lookups to
> > >> > > > > somewhat being able to speculative queries where by users can
> > >> search
> > >> > > if a
> > >> > > > > certain pattern exists. I can vouch for this personally
> because I
> > >> > > wanted to
> > >> > > > > use state stores for one such use case and since this option
> > >> wasn't
> > >> > > there,
> > >> > > > > I had to do some other things. An equivalent to this could be
> SCAN
> > >> > > operator
> > >> > > > > in Redis. (Not trying to compare the Redis and state stores
> but
> > >> trying
> > >> > > to
> > >> > > > > give some context).
> > >> > > > >
> > >> > > > > Regarding the point on bloom filter, I think there are certain
> > >> > > > > optimisations that are being talked about in case of prefix
> seek
> > >> here:
> > >> > > > >
> > >> > > > >
> > >> > > > >
> > >> > >
> > >>
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> > >> > > > > Again
> > >> > > > > this isn't something that I have explored fully. Also, on the
> > >> prefix
> > >> > > seek
> > >> > > > > page on RocksDB they mention that there's a prefix iterating
> > >> technique
> > >> > > > > called Prefix Bloom Filter.
> > >> > > > >
> > >> > > > > 3) Regarding the question on length of bytes for seek v/s
> prefix
> > >> seek,
> > >> > > I am
> > >> > > > > not entirely sure about that scenario. What I have understood
> is
> > >> that
> > >> > > > > at-least for Rocks DB, it is more performant for short
> iterator
> > >> queries
> > >> > > > > that longer ones.
> > >> > > > >
> > >> > > > > 4) Regarding the last question on placing it within Segment,
> the
> > >> > > reason I
> > >> > > > > didn't do that way, is that I thought we shouldn't tie this
> > >> feature
> > >> > > only to
> > >> > > > > RocksDB. I agree that I got this idea while looking/reading
> about
> > >> > > RocksDB
> > >> > > > > but if we keep it outside the purview of RocksDB and keep it
> as a
> > >> > > pluggable
> > >> > > > > entity, then a) it remains generic by not being tied to any
> > >> specific
> > >> > > store
> > >> > > > > and b) no change is needed at all for any of the other stores
> > >> which
> > >> > > haven't
> > >> > > > > implemented it.
> > >> > > > >
> > >> > > > > I am not sure of any of the above points make sense but as I
> said,
> > >> > > this is
> > >> > > > > based out of my limited understanding of the codebase. So,
> pardon
> > >> any
> > >> > > > > incorrect/illogical statements plz!
> > >> > > > >
> > >> > > > > @Sophie,
> > >> > > > >
> > >> > > > > Thanks for bringing that point up! I have mentioned about
> that PR
> > >> in
> > >> > > the
> > >> > > > > KIP under a section called Other considerations. Nonetheless,
> > >> thanks
> > >> > > for
> > >> > > > > pointing it out!
> > >> > > > >
> > >> > > > > Thanks!
> > >> > > > > Sagar.
> > >> > > > >
> > >> > > > >
> > >> > > > > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman <
> > >> > > sop...@confluent.io>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Not to derail this KIP discussion, but to leave a few notes
> on
> > >> some
> > >> > > of
> > >> > > > > the
> > >> > > > > > RocksDB points that have come up:
> > >> > > > > >
> > >> > > > > > Someone actually merged some long overdue performance
> > >> improvements to
> > >> > > > > > the RocksJava implementation (the PR was opened back in
> 2017!
> > >> yikes).
> > >> > > > > > I haven't looked into the prefix seek API closely enough to
> > >> know how
> > >> > > > > > relevant
> > >> > > > > > this particular change is, and they are still improving
> things,
> > >> but
> > >> > > it
> > >> > > > > > gives me some
> > >> > > > > > faith.
> > >> > > > > >
> > >> > > > > > There are some pretty promising results reported on the PR:
> > >> > > > > >
> > >> https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
> > >> > > > > >
> > >> > > > > > Regarding the custom comparator, they also recently merged
> this
> > >> > > > > performance
> > >> > > > > > <https://github.com/facebook/rocksdb/pull/6252>
> > >> > > > > > improvement <https://github.com/facebook/rocksdb/pull/6252
> >.
> > >> The
> > >> > > tl;dr
> > >> > > > > is
> > >> > > > > > they reduced the slowdown of a custom comparator in Java
> > >> > > > > > (relative to the native C++) from ~7x to ~5.2x at best.
> Which is
> > >> > > still
> > >> > > > > not
> > >> > > > > > great, but it
> > >> > > > > > would be interesting to run our own benchmarks and see how
> this
> > >> > > stacks
> > >> > > > > up.
> > >> > > > > >
> > >> > > > > > Of course, these are all new changes and as such will
> require
> > >> us to
> > >> > > > > upgrade
> > >> > > > > > rocks to 6.x which means they have to wait for us to
> release a
> > >> 3.0.
> > >> > > But
> > >> > > > > > there's
> > >> > > > > > some talk about 3.0 coming in the next few releases so
> consider
> > >> it
> > >> > > food
> > >> > > > > for
> > >> > > > > > not-so-future thought
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <
> > >> > > adam.bellem...@gmail.com
> > >> > > > > >
> > >> > > > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi Guozhang
> > >> > > > > > >
> > >> > > > > > > For clarity, the issues I was running into was not about
> the
> > >> actual
> > >> > > > > > > *prefixSeek* function itself, but about exposing it to the
> > >> same
> > >> > > level
> > >> > > > > of
> > >> > > > > > > access as the *range* function throughout Kafka Streams.
> It
> > >> > > required a
> > >> > > > > > lot
> > >> > > > > > > of changes, and also required that most state stores stub
> it
> > >> out
> > >> > > since
> > >> > > > > it
> > >> > > > > > > wasn't clear how they would implement it. It was
> basically an
> > >> > > > > > overreaching
> > >> > > > > > > API change that was easily solved (for the specific
> > >> prefix-scan in
> > >> > > FKJ)
> > >> > > > > > by
> > >> > > > > > > simply using *range*. So to be clear, the blockers were
> > >> > > predominantly
> > >> > > > > > > around correctly handling the API changes, nothing to do
> with
> > >> the
> > >> > > > > > > mechanisms of the RocksDB prefix scanning.
> > >> > > > > > >
> > >> > > > > > > As for KAFKA-5285 I'll look into it more to see if I can
> get a
> > >> > > better
> > >> > > > > > > handle on the problem!
> > >> > > > > > >
> > >> > > > > > > Hope this helps clear it up.
> > >> > > > > > >
> > >> > > > > > > Adam
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <
> > >> wangg...@gmail.com>
> > >> > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Hello Adam,
> > >> > > > > > > >
> > >> > > > > > > > I'm wondering if you can provide a bit more context on
> the
> > >> > > blockers
> > >> > > > > of
> > >> > > > > > > > using prefixSeek of RocksDB (I saw you have a
> > >> > > RocksDBPrefixIterator
> > >> > > > > > class
> > >> > > > > > > > but not used anywhere yet)? I'm currently looking at
> ways to
> > >> > > allow
> > >> > > > > some
> > >> > > > > > > > secondary indices with rocksDB following some existing
> > >> approaches
> > >> > > > > > > > from CockroachDB etc so I'm very curious to learn your
> > >> > > experience.
> > >> > > > > > > >
> > >> > > > > > > > 1) Before considering any secondary indices, a quick
> > >> thought is
> > >> > > that
> > >> > > > > > for
> > >> > > > > > > > (key, timeFrom, timeTo) queries, we can easily replace
> the
> > >> > > current
> > >> > > > > > > > `range()` impl with a `prefixRange()` impl via a prefix
> > >> iterator;
> > >> > > > > > though
> > >> > > > > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much more
> > >> > > complicated
> > >> > > > > > indeed
> > >> > > > > > > > and hence existing `range()` impl may still be used.
> > >> > > > > > > >
> > >> > > > > > > > 2) Another related issue I've been pondering for a
> while is
> > >> > > > > > > > around KAFKA-5285: with the default lexicograpic byte
> > >> comparator,
> > >> > > > > since
> > >> > > > > > > the
> > >> > > > > > > > key length varies, the combo (key, window) would have
> > >> > > interleaving
> > >> > > > > byte
> > >> > > > > > > > layouts like:
> > >> > > > > > > >
> > >> > > > > > > > AAA0001          (key AAA, timestamp 0001)
> > >> > > > > > > > AAA00011        (key AAA0, timestamp 0011)
> > >> > > > > > > > AAA0002          (key AAA, timestamp 0002)
> > >> > > > > > > >
> > >> > > > > > > > which is challenging for prefix seeks to work
> efficiently.
> > >> > > Although
> > >> > > > > we
> > >> > > > > > > can
> > >> > > > > > > > overwrite the byte-comparator in JNI it is very
> expensive
> > >> and the
> > >> > > > > cost
> > >> > > > > > of
> > >> > > > > > > > JNI overwhelms its benefits. If you've got some ideas
> > >> around it
> > >> > > > > please
> > >> > > > > > > lmk
> > >> > > > > > > > as well.
> > >> > > > > > > >
> > >> > > > > > > > Guozhang
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <
> > >> > > > > > adam.bellem...@gmail.com
> > >> > > > > > > >
> > >> > > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Hi Sagar
> > >> > > > > > > > >
> > >> > > > > > > > > I implemented a very similar interface for KIP-213,
> the
> > >> > > foreign-key
> > >> > > > > > > > joiner.
> > >> > > > > > > > > We pulled it out of the final implementation and
> instead
> > >> used
> > >> > > > > RocksDB
> > >> > > > > > > > range
> > >> > > > > > > > > instead. You can see the particular code where we use
> > >> > > > > > > RocksDB.range(...)
> > >> > > > > > > > to
> > >> > > > > > > > > get the same iterator result.
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > >> > > > > > > > >
> > >> > > > > > > > > We pulled it out because there were numerous awkward
> > >> > > acrobatics to
> > >> > > > > > > > > integrate *prefixSeek()* function into the Kafka
> Streams
> > >> code.
> > >> > > > > > > > Basically, I
> > >> > > > > > > > > wanted to be able to access *prefixSeek()* the same
> way I
> > >> can
> > >> > > > > access
> > >> > > > > > > > > *range()* for any state store, and in particular use
> it
> > >> for
> > >> > > storing
> > >> > > > > > > data
> > >> > > > > > > > > with a particular foreign key (as per the previous
> URL).
> > >> > > However, I
> > >> > > > > > > found
> > >> > > > > > > > > out that it required way too many changes to expose
> the
> > >> > > > > > *prefixSeek()*
> > >> > > > > > > > > functionality while still being able to leverage all
> the
> > >> nice
> > >> > > Kafka
> > >> > > > > > > > Streams
> > >> > > > > > > > > state management + supplier functionality, so we made
> a
> > >> > > decision
> > >> > > > > just
> > >> > > > > > > to
> > >> > > > > > > > > stick with *range()* and pull everything else out.
> > >> > > > > > > > >
> > >> > > > > > > > > I guess my question here is, how do you anticipate
> using
> > >> > > > > > *prefixSeek()*
> > >> > > > > > > > > within the framework of Kafka Streams, or the
> Processor
> > >> API?
> > >> > > > > > > > >
> > >> > > > > > > > > Adam
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar <
> > >> > > sagarmeansoc...@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Hi All,
> > >> > > > > > > > > >
> > >> > > > > > > > > > I would like to start a discussion on the KIP that I
> > >> created
> > >> > > > > below
> > >> > > > > > to
> > >> > > > > > > > add
> > >> > > > > > > > > > prefix scan support in State Stores:
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > >
> > >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > >> > > > > > > > > >
> > >> > > > > > > > > > Thanks!
> > >> > > > > > > > > > Sagar.
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > --
> > >> > > > > > > > -- Guozhang
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to