Woah, this is great, Sagar!

I think this API looks really good. I'm curious if anyone else has
any concern. For my part, I think this will work just fine. People
might face tricky bugs getting their key serde and their prefix
serde "aligned", but I think the API makes it pretty obvious what
has to happen to make this work. As long as the API isn't going
to "trick" anyone by trying to abstract away things that can't be
abstracted, this is the best we can do. In other words, I think
your approach is ideal here.

I also really appreciate that you took the time to do a full POC
with end-to-end tests to show that the proposal is actually
going to work.

A couple of notes as you update the KIP:

1. I think that for "optional" state store features like this, we
should add a default implementation to the interface that
throws UnsupportedOperationException. That way,
any existing store implementations won't fail to compile
on the new version. And any store that just can't support
a prefix scan would simply not override the method.

2. I think you meant "prefixScan", not "prefixSeek", since
we're actually getting an iterator that only returns prefix-
matching keys, as opposed to just seeking to that prefix.

Thanks again for the work you've put into this. I look
forward to reviewing the updated KIP.

Thanks,
-John


On Sun, Jun 28, 2020, at 12:17, Sagar wrote:
> Hi John,
> 
> I took some time out and as we discussed, looked to implement these
> changes. Most of these changes are for demonstrative purposes but I thought
> I will share.
> 
> I added the new prefixSeek method at the KeyValueStore interface level:
> 
> https://github.com/confluentinc/kafka/pull/242/files#diff-5e92747b506c868db3948323478e1b07R74-R83
> 
> As you had pointed out, the prefix type can be different from the key type.
> That's why this method takes 2 parameters. the key type and it's serializer.
> 
> Then I added the implementation of this method in a couple of Stores.
> RocksDBStore:
> 
> https://github.com/confluentinc/kafka/pull/242/commits#diff-046ca566243518c88e007b7499ec9f51R308-R320
> and
> InMemoryKVStore:
> 
> https://github.com/confluentinc/kafka/pull/242/commits#diff-4c685a32e765eab60bcb60097768104eR108-R120
> 
> I modified the older test cases for RocksDBStore. You can find them here:
> 
> https://github.com/confluentinc/kafka/pull/242/commits#diff-051439f56f0d6a12334d7e8cc4f66bf8R304-R415
> 
> 
> I have added a test case where the keys are of type UUID while the prefix
> is of type string. This seems to be working because the code is able to
> pull in UUIDs with the provided prefix, even though their types are
> different.
> 
> To address one of the gaps from my previous implementation, I have also
> added a test case for the end to end flow using the state store supplier.
> you can find it here:
> 
> https://github.com/confluentinc/kafka/pull/242/commits#diff-a94de5b2ec72d09ebac7183c31d7c906R269-R305
> 
> Note that for this to work, i needed to update MeteredKVstore and
> ChangeLoggingKVStore.
> 
> Lastly, barring the 4 stores mentioned above, rest of the implementers of
> KVStore have the prefixSeek override as null. As I said, this is mainly for
> demonstrative purposes and hence done this way.
> If you get the chance, it would be great if you can provide some feedback
> on this.
> 
> Thanks!
> Sagar.
> 
> 
> On Wed, Jun 10, 2020 at 9:21 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> 
> > Hi John,
> >
> > You rightly pointed out, the devil is in the detail :). I will start with
> > the implementation to get a sense.
> >
> > Here are my thoughts on the core challenge that you pointed out. The key
> > value store abstractions that have been exposed via the state store DSL
> > APIs, make it possible for the end user to define generic key types.
> > However, the Serdes are the one which convert those generic keys/values
> > into the format in which the state store stores them- which for all
> > practical purposes are byte-arrays. I think with the prefix type serde, if
> > it converts the prefix to the same internal storage type (byte array) as
> > that of the Keys, then we should be able to do a prefix scan.
> >
> > Regarding other databases, I have worked a bit with Redis which also
> > provides a scan operator using the glob style pattern match(it's more
> > evolved than prefix scan but can be converted easily):
> >
> > https://redis.io/commands/scan#the-match-option
> >
> > Typically Redis works with Binary Safe strings so the prefix key type and
> > the actual keys are of the same type.
> >
> > Thanks!
> > Sagar.
> >
> >
> >
> > On Wed, Jun 10, 2020 at 1:41 AM John Roesler <vvcep...@apache.org> wrote:
> >
> >> Hi Sagar,
> >>
> >> Thanks for the reply. I agree that your UUID example illustrates the
> >> problem I was pointing out.
> >>
> >> Yes, I think that experimenting with the API in the PR is probably the
> >> best way to make progress (as opposed to just thinking in terms of
> >> design on the wiki) because with this kind of thing, the devil is often
> >> in the details.
> >>
> >> To clarify what I meant by that last statement, I see the core challenge
> >> here as deriving from the fact that we have a key/value store with
> >> generically typed keys, with a separate component (the serde) that
> >> turns those typed keys into bytes for storage. In contrast, RocksDB
> >> can easily offer a "prefix scan" operation because they key type is
> >> always just a byte array, so "prefix" is a very natural concept to offer
> >> in the API. Other key/value stores force the keys to always be strings,
> >> which also makes it easy to define a prefix scan.
> >>
> >> My question is whether there are other databases that offer both:
> >> 1. generically typed keys (as opposed to just bytes, just strings, etc)
> >> 2. prefix scans
> >> And, if so, what the API looks like.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Tue, Jun 9, 2020, at 11:51, Sagar wrote:
> >> > Hi John,
> >> >
> >> > Thanks for the response. For starters, for our use case, we tweaked our
> >> > keys etc to avoid prefix scans. So, we are good there.
> >> >
> >> > Regarding the KIP, I see what you mean when you say that the same key
> >> type
> >> > for prefix won't work. For example, continuing with the UUID example
> >> that
> >> > you gave, let's say one of the UUIDs
> >> > is 123e4567-e89b-12d3-a456-426614174000, and with a prefix scan we want
> >> to
> >> > fetch all keys starting with 123. There's already a UUIDSerde so if the
> >> > keys have been stored with that, then using UUIDSerde for prefixes won't
> >> > help- I am not sure if the UUIDSerializer would even work for 123.
> >> >
> >> > So, that indicates that we will need to provide a new prefix key type
> >> > serializer. Having said that, how it will be stitched together and
> >> finally
> >> > exposed using the APIs is something that is up for questioning. This is
> >> > something you have also brought up in the earlier emails. If it
> >> > makes sense, I can modify my PR to go along these lines. Please let me
> >> know
> >> > what you think.
> >> >
> >> > Lastly, I didn't understand this line of yours: *It might help if there
> >> are
> >> > other typed key/value stores to compare APIs with.*
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> >
> >> > On Thu, Jun 4, 2020 at 6:03 AM John Roesler <vvcep...@apache.org>
> >> wrote:
> >> >
> >> > > Hi Sagar,
> >> > >
> >> > > Thanks for the question, and sorry for muddying the water.
> >> > >
> >> > > I meant the Bytes/byte[] thing as advice for how you all can solve
> >> your
> >> > > problem in the mean time, while we work through this KIP. I don’t
> >> think
> >> > > it’s relevant for the KIP itself.
> >> > >
> >> > > I think the big issue here is what the type of the prefix should be
> >> in the
> >> > > method signature. Using the same type as the key makes sense some
> >> times,
> >> > > but not other times. I’m not sure what the best way around this might
> >> be.
> >> > > It might help if there are other typed key/value stores to compare
> >> APIs
> >> > > with.
> >> > >
> >> > > Thanks,
> >> > > John
> >> > >
> >> > > On Mon, Jun 1, 2020, at 09:58, Sagar wrote:
> >> > > > Hi John,
> >> > > >
> >> > > > Just to add to my previous email(and sorry for the spam), if we
> >> consider
> >> > > > using Bytes/byte[] and manually invoke the serdes, if you could
> >> provide
> >> > > > examples where the same Serde for keys won't work for the prefix
> >> types.
> >> > > As
> >> > > > far as my understanding goes, the prefix seek would depend upon
> >> ordering
> >> > > of
> >> > > > the keys like lexicographic. As long as the binary format is
> >> consistent
> >> > > for
> >> > > > both the keys and the prefixes would it not ensure the ability to
> >> search
> >> > > in
> >> > > > that same ordering space? This is from my limited understanding so
> >> any
> >> > > > concrete examples would be helpful...
> >> > > >
> >> > > > Also, you mentioned about the creation of dummy values to indicate
> >> prefix
> >> > > > values, do you mean this line:
> >> > > >
> >> > > >
> >> > >
> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L91
> >> > > > This
> >> > > > is where the prefix key is built and used for searching .
> >> > > >
> >> > > > Thanks!
> >> > > > Sagar.
> >> > > >
> >> > > > On Mon, Jun 1, 2020 at 11:42 AM Sagar <sagarmeansoc...@gmail.com>
> >> wrote:
> >> > > >
> >> > > > > Hi John,
> >> > > > >
> >> > > > > Thank you. I think it makes sense to modify the KIP to add the
> >> > > > > prefixScan() as part of the existing interfaces and add the new
> >> mixin
> >> > > > > behaviour as Rejected alternatives. I am not very aware of other
> >> stores
> >> > > > > apart from keyValueStore so is it fine if I keep it there for now?
> >> > > > >
> >> > > > > Regarding the type definition of types I will try and think about
> >> some
> >> > > > > alternatives and share if I get any.
> >> > > > >
> >> > > > > Thanks!
> >> > > > > Sagar.
> >> > > > >
> >> > > > >
> >> > > > > On Sun, May 31, 2020 at 1:55 AM John Roesler <vvcep...@apache.org
> >> >
> >> > > wrote:
> >> > > > >
> >> > > > >> Hi Sagar,
> >> > > > >>
> >> > > > >> Thanks for the response. Your use case makes sense to me; I
> >> figured it
> >> > > > >> must be something like that.
> >> > > > >>
> >> > > > >> On a pragmatic level, in the near term, you might consider
> >> basically
> >> > > > >> doing the same thing we did in KIP-213. If you swap out the store
> >> > > types for
> >> > > > >> Byte/byte[] and “manually” invoke the serdes in your own logic,
> >> then
> >> > > you
> >> > > > >> can use the same algorithm we did to derive the range scan
> >> boundaries
> >> > > from
> >> > > > >> your desired prefix.
> >> > > > >>
> >> > > > >> For the actual KIP, it seems like we would need significant
> >> design
> >> > > > >> improvements to be able to do any mixins, so I think we should
> >> favor
> >> > > > >> proposing either to just add to the existing interfaces or to
> >> create
> >> > > brand
> >> > > > >> new interfaces, as appropriate, for now. Given that prefix can be
> >> > > converted
> >> > > > >> to a range query at a low level, I think we can probably explore
> >> > > adding
> >> > > > >> prefix to the existing interfaces with a default implementation.
> >> > > > >>
> >> > > > >> It seems like that just leaves the question of how to define the
> >> type
> >> > > of
> >> > > > >> the prefix. To be honest, I don’t have any great ideas here. Are
> >> you
> >> > > able
> >> > > > >> to generate some creative solutions, Sagar?
> >> > > > >>
> >> > > > >> Thanks,
> >> > > > >> John
> >> > > > >>
> >> > > > >> On Tue, May 26, 2020, at 06:42, Sagar wrote:
> >> > > > >> > Hi John,
> >> > > > >> >
> >> > > > >> > Thanks for the detailed reply. I was a bit crammed with work
> >> last
> >> > > week
> >> > > > >> so
> >> > > > >> > couldn't respond earlier so apologies for that.
> >> > > > >> >
> >> > > > >> > First of all, thanks for the context that both you and Adam
> >> have
> >> > > > >> > provided me on the issues faced previously. As I can clearly
> >> see,
> >> > > while
> >> > > > >> I
> >> > > > >> > was able to cut some corners while writing some test cases or
> >> > > > >> benchmarks,
> >> > > > >> > to be able to stitch together a store with prefix scan into an
> >> > > actual
> >> > > > >> > topology needs more work. I am sorry for the half baked tests
> >> that I
> >> > > > >> wrote
> >> > > > >> > without realising and you have rightly put it when you said
> >> these
> >> > > > >> > challenges aren't obvious up front.
> >> > > > >> >
> >> > > > >> > Now, coming back to the other points, I spent some time going
> >> > > through
> >> > > > >> the
> >> > > > >> > KIP-213 and also some of the code snippets that are talked
> >> about in
> >> > > that
> >> > > > >> > KIP. With the detailed explanation that you provided, it is now
> >> > > obvious
> >> > > > >> to
> >> > > > >> > me that keeping a generic type for keys like K won't work oob
> >> and
> >> > > hence
> >> > > > >> a
> >> > > > >> > decision was made to use Bytes as the key type.
> >> > > > >> >
> >> > > > >> > I just had another thought on this though. I looked at the
> >> range
> >> > > > >> function
> >> > > > >> > that was added in the ReadOnlyKeyValueStore. While the Key and
> >> the
> >> > > Value
> >> > > > >> > mentioned in that method is generic, internally almost all
> >> queries
> >> > > end
> >> > > > >> up
> >> > > > >> > querying using Bytes in some or the other form. I looked at
> >> not just
> >> > > > >> > RocksDb Store but other stores like InMemory store or
> >> MemoryLRU and
> >> > > this
> >> > > > >> > seems to be the pattern. I think this stems from the fact that
> >> these
> >> > > > >> stores
> >> > > > >> > while implementing KeyValueStore pass Bytes, byte[] as the K
> >> and V
> >> > > > >> values.
> >> > > > >> > Classes like MeteredKeyValueStore which don't do this, still
> >> use
> >> > > > >> Bytes.wrap
> >> > > > >> > to wrap the passed keys and values and invoke the range method.
> >> > > > >> >
> >> > > > >> > So, the point I am trying to make is, with the same behaviour
> >> - and
> >> > > > >> > ignoring for a moment that it's a separate interface which I am
> >> > > trying
> >> > > > >> to
> >> > > > >> > "mix-in"- the issues with the key types could be resolved. I
> >> may be
> >> > > > >> wrong
> >> > > > >> > though so would like to know your thoughts on this. Infact
> >> > > unknowingly
> >> > > > >> the
> >> > > > >> > interface implementation of PrefixSeekableType in
> >> RockDBStateStore
> >> > > also
> >> > > > >> > passes Bytes and bytes[] as K and V.
> >> > > > >> >
> >> > > > >> > The second part of exposing it via the publically accessible
> >> > > interfaces
> >> > > > >> to
> >> > > > >> > which we downcast while building the topology (like
> >> KeyValueStore),
> >> > > I
> >> > > > >> can
> >> > > > >> > clearly see now that mixing-in the way I tried to won't work.
> >> My
> >> > > > >> intention
> >> > > > >> > all along was not to hamper the flow of those stores which
> >> don't
> >> > > support
> >> > > > >> > prefix scan as yet and hence the separate interface. But, I
> >> agree
> >> > > that
> >> > > > >> for
> >> > > > >> > this to work, it needs to be part of some pre-defined store
> >> types
> >> > > like
> >> > > > >> > KVStore etc. Right now, I don't have an answer to this but
> >> mostly it
> >> > > > >> would
> >> > > > >> > have to be moved there and implemented across all stores(if we
> >> see
> >> > > the
> >> > > > >> > worth in prefix scans :) )
> >> > > > >> >
> >> > > > >> > Regarding the motivation, I am sorry if I wasn't clear. This
> >> > > originated
> >> > > > >> > from one of my own use cases with kafka streams where i needed
> >> to
> >> > > find
> >> > > > >> some
> >> > > > >> > keys based upon certain prefix. Infact it's similar to the
> >> > > > >> > RangeScanCombinedKeyUsage diagram in KIP-213 where the
> >> otherTable
> >> > > tries
> >> > > > >> to
> >> > > > >> > find entries in the state store based upon the FK. I was using
> >> > > > >> > KevValueStore to be precise. I also remember having a slack
> >> > > > >> conversation on
> >> > > > >> > this, and I was told that this isn't supported right now, but
> >> some
> >> > > other
> >> > > > >> > users shared their experiences on how with some hacks they are
> >> able
> >> > > to
> >> > > > >> > perform prefix scans even though their use case fits the bill
> >> for a
> >> > > > >> prefix
> >> > > > >> > scan. That kind of motivated me to take a stab at it.
> >> > > Unfortunately, I
> >> > > > >> have
> >> > > > >> > lost the slack chat because of some cleanup at the slack
> >> channel
> >> > > level.
> >> > > > >> I
> >> > > > >> > will try and update the ambiguous motivation statement in the
> >> near
> >> > > > >> future.
> >> > > > >> >
> >> > > > >> > Lastly, I would like to point out, that your response was not
> >> at all
> >> > > > >> > discouraging. On the contrary it was really insightful and it's
> >> > > always
> >> > > > >> good
> >> > > > >> > to learn/discover new things :)
> >> > > > >> >
> >> > > > >> > Thanks!
> >> > > > >> > Sagar.
> >> > > > >> >
> >> > > > >> > On Fri, May 15, 2020 at 7:37 AM John Roesler <
> >> vvcep...@apache.org>
> >> > > > >> wrote:
> >> > > > >> >
> >> > > > >> > > Hi, Sagar!
> >> > > > >> > >
> >> > > > >> > > Thanks for this KIP. I'm sorry it took me so long to reply.
> >> I'll
> >> > > > >> number my
> >> > > > >> > > points differently to avoid confusion.
> >> > > > >> > >
> >> > > > >> > > I can provide some additional context on the difficulties we
> >> > > > >> previously
> >> > > > >> > > faced in KIP-213 (which you and Adam have already discussed).
> >> > > > >> > >
> >> > > > >> > > J1) In your KIP, you propose the following interface:
> >> > > > >> > >
> >> > > > >> > > public interface PrefixSeekableStore<K, V> {
> >> > > > >> > >     KeyValueIterator<K, V> prefixSeek(K prefix);
> >> > > > >> > > }
> >> > > > >> > >
> >> > > > >> > > This is roughly the same thing that Adam and I were
> >> considering
> >> > > > >> > > before. It has a hidden problem, that it assumes that
> >> prefixes of
> >> > > > >> > > keys in the key space are also in the key space. In other
> >> words,
> >> > > this
> >> > > > >> > > is a store with key type K, and the API assumes that
> >> prefixes are
> >> > > also
> >> > > > >> > > of type K. This is true for some key types, like String or
> >> Bytes,
> >> > > but
> >> > > > >> not
> >> > > > >> > > for others.
> >> > > > >> > >
> >> > > > >> > > For example, if the keys are UUIDs, then no prefix is also a
> >> > > UUID. If
> >> > > > >> the
> >> > > > >> > > key is a complex data type, like Windowed<K> in our own DSL,
> >> then
> >> > > > >> > > we would absolutely want to query all keys with the same
> >> record
> >> > > key
> >> > > > >> > > (the K part), or the same window start time, but in neither
> >> case
> >> > > is
> >> > > > >> the
> >> > > > >> > > prefix actually a Windowed<K>.
> >> > > > >> > >
> >> > > > >> > > You can skirt the issue by defining a third type parameter,
> >> maybe
> >> > > KP,
> >> > > > >> that
> >> > > > >> > > is the "prefix" type, but this would also be awkward for many
> >> > > usages.
> >> > > > >> > >
> >> > > > >> > > J2) There is a related problem with serialization. Whether
> >> > > something
> >> > > > >> > > is a prefix or not depends not on the Java key (K), but on
> >> the
> >> > > binary
> >> > > > >> > > format that is produced when you use a serde on the key.
> >> Whether
> >> > > > >> > > we say that the prefix must also be a K or whether it gets
> >> its own
> >> > > > >> type,
> >> > > > >> > > KP, there are problems.
> >> > > > >> > >
> >> > > > >> > > In the latter case, we must additionally require a second
> >> set of
> >> > > > >> serdes
> >> > > > >> > > for the prefixes, but there's no obvious way to incorporate
> >> this
> >> > > in
> >> > > > >> the
> >> > > > >> > > API, especially not in the DSL.
> >> > > > >> > >
> >> > > > >> > > In either case, for the API to actually work, we need to know
> >> > > ahead
> >> > > > >> > > of time that the Serde will produce a binary key that starts
> >> with
> >> > > the
> >> > > > >> > > part that we wish to use as a prefix. For example, what we
> >> were
> >> > > doing
> >> > > > >> > > briefly in KIP-213 (where we had complex keys, similar to
> >> > > Windowed<K>)
> >> > > > >> > > was to define "dummy" values that indicate that a
> >> Windowed<K> is
> >> > > > >> actually
> >> > > > >> > > just a prefix key, not a real key. Maybe the window start
> >> time
> >> > > would
> >> > > > >> be
> >> > > > >> > > null or the key part would be null. But we also had to
> >> define a
> >> > > serde
> >> > > > >> > > that would very specifically anticipate which component of
> >> the
> >> > > complex
> >> > > > >> > > key would need to be used in a prefix key. Having to bring
> >> all
> >> > > these
> >> > > > >> > > parts together in a reliable, easy-to-debug, fashion gives
> >> me some
> >> > > > >> doubt
> >> > > > >> > > that people would actually be able to use this feature in
> >> > > complicated
> >> > > > >> > > programs without driving themselves crazy.
> >> > > > >> > >
> >> > > > >> > > J3) Thanks so much for including benchmarks and tests!
> >> > > Unfortunately,
> >> > > > >> > > these don't include everything you need to really plug into
> >> the
> >> > > > >> Streams
> >> > > > >> > > API. I think when you push it a little farther, you'll
> >> realize
> >> > > what
> >> > > > >> Adam
> >> > > > >> > > was talking about wrt the interface difficulties.
> >> > > > >> > >
> >> > > > >> > > In your benchmark and tests, you directly construct the
> >> store and
> >> > > then
> >> > > > >> > > use it, but in a real Streams application, you can only
> >> provide
> >> > > your
> >> > > > >> > > implementation in a StoreSupplier, for example via the
> >> > > Materialized
> >> > > > >> > > parameter. Then, to use the store from inside a Processor,
> >> you'd
> >> > > have
> >> > > > >> > > to get it by name from the ProcessorContext, and then cast
> >> it to
> >> > > one
> >> > > > >> of
> >> > > > >> > > the pre-defined store types, KeyValueStore, WindowedStore, or
> >> > > > >> > > SessionStore. It won't work to "mix in" your interface
> >> because the
> >> > > > >> > > processor gets a store that's wrapped in layers that handle
> >> > > > >> serialization,
> >> > > > >> > > change-logging, recording metrics, and caching.
> >> > > > >> > >
> >> > > > >> > > To use the store through IQ, you have to provide a
> >> > > QueriableStoreType
> >> > > > >> > > to KafkaStreams#store, and you get back a similarly wrapped
> >> store.
> >> > > > >> > >
> >> > > > >> > > I think our only choices to add an interface like yours is
> >> either
> >> > > to
> >> > > > >> add
> >> > > > >> > > it to one of the existing store types, like KeyValueStore or
> >> > > > >> > > WindowedStore, or to define a completely new store hierarchy,
> >> > > meaning
> >> > > > >> > > you have to duplicate all the "wrapper" layers in Streams.
> >> > > > >> > >
> >> > > > >> > > I think if you write an "end-to-end" test, where you write a
> >> > > Streams
> >> > > > >> app,
> >> > > > >> > > provide your store, and then use it in a Processor and
> >> through IQ,
> >> > > > >> > > you'll see what I'm talking about.
> >> > > > >> > >
> >> > > > >> > > IIRC, those three points were the ones that ultimately led
> >> us to
> >> > > > >> abandon
> >> > > > >> > > the whole idea last time and just register the stores with
> >> key
> >> > > type
> >> > > > >> Bytes.
> >> > > > >> > > I think some creative solutions may yet be possible, but
> >> it'll
> >> > > take
> >> > > > >> some
> >> > > > >> > > more design work to get there.
> >> > > > >> > >
> >> > > > >> > > Can I ask what your motivation is, exactly, for proposing
> >> this
> >> > > > >> feature?
> >> > > > >> > > The motivation just says "some users may want to do it",
> >> which has
> >> > > > >> > > the advantage that it's impossible to disagree with, but
> >> doesn't
> >> > > > >> provide
> >> > > > >> > > a lot of concrete detail ;)
> >> > > > >> > >
> >> > > > >> > > Specifically, what I'm wondering is whether you wanted to use
> >> > > this as
> >> > > > >> > > part of a KayValue store, which might be a challenge, or
> >> whether
> >> > > you
> >> > > > >> > > wanted to use it for more efficient scans in a
> >> WindowedStore, like
> >> > > > >> > > Guozhang.
> >> > > > >> > >
> >> > > > >> > > Thanks again for the KIP! I hope my response isn't too
> >> > > discouraging;
> >> > > > >> > > I just wanted to convey the challenges we faced last time,
> >> since
> >> > > they
> >> > > > >> > > are all not obvious up front.
> >> > > > >> > >
> >> > > > >> > > Best regards,
> >> > > > >> > > -John
> >> > > > >> > >
> >> > > > >> > >
> >> > > > >> > > On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> >> > > > >> > > > Whoops, I guess I didn't finish reading the KIP all the
> >> way to
> >> > > the
> >> > > > >> end
> >> > > > >> > > > earlier. Thanks
> >> > > > >> > > > for including the link to the RocksDB PR in the KIP!
> >> > > > >> > > >
> >> > > > >> > > > I have one additional question about the proposal: do you
> >> plan
> >> > > to
> >> > > > >> also
> >> > > > >> > > add
> >> > > > >> > > > this
> >> > > > >> > > > prefix seek API to the dual column family iterators? These
> >> are
> >> > > used
> >> > > > >> by
> >> > > > >> > > > RocksDBTimestampedStore (which extends RocksDBStore), for
> >> > > example
> >> > > > >> the
> >> > > > >> > > > *RocksDBDualCFRangeIterator*
> >> > > > >> > > >
> >> > > > >> > > > Thanks for the KIP!
> >> > > > >> > > >
> >> > > > >> > > > On Thu, May 14, 2020 at 10:50 AM Sagar <
> >> > > sagarmeansoc...@gmail.com>
> >> > > > >> > > wrote:
> >> > > > >> > > >
> >> > > > >> > > > > Hey @Adam,
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks for sharing your experience with using prefix
> >> seek. I
> >> > > did
> >> > > > >> look
> >> > > > >> > > at
> >> > > > >> > > > > your code for RocksDBPrefixIterator, infact I have
> >> repurposed
> >> > > that
> >> > > > >> > > class
> >> > > > >> > > > > itself since it wasn't being used else where. Regarding
> >> how I
> >> > > > >> plan to
> >> > > > >> > > > > expose them through-out the state stores, what I have
> >> tried
> >> > > to do
> >> > > > >> is
> >> > > > >> > > add it
> >> > > > >> > > > > as a separate interface. So, basically, it is not at the
> >> same
> >> > > > >> level as
> >> > > > >> > > the
> >> > > > >> > > > > *range function so to speak. The reason I did that is
> >> > > currently I
> >> > > > >> feel
> >> > > > >> > > not
> >> > > > >> > > > > all state stores are a natural fit for prefix seek. As I
> >> > > > >> mentioned in
> >> > > > >> > > the
> >> > > > >> > > > > KIP as well, the current equivalent to it could be
> >> > > > >> > > BulkLoadingStore(not in
> >> > > > >> > > > > terms of functionality but in terms of how it is also not
> >> > > > >> implemented
> >> > > > >> > > by
> >> > > > >> > > > > all of them). So, that ways I am not needing to stub them
> >> > > across
> >> > > > >> all
> >> > > > >> > > the
> >> > > > >> > > > > state-stores and we can implement it only where needed.
> >> For
> >> > > > >> example,
> >> > > > >> > > in the
> >> > > > >> > > > > PR that I have put for reference in the KIP, you can see
> >> that
> >> > > I
> >> > > > >> have it
> >> > > > >> > > > > implemented only for RocksDB.
> >> > > > >> > > > >
> >> > > > >> > > > > @Guozhang,
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks for the feedback. Those are very interesting
> >> questions
> >> > > and
> >> > > > >> I
> >> > > > >> > > will
> >> > > > >> > > > > try my best to answer based upon whatever limited
> >> > > understanding I
> >> > > > >> have
> >> > > > >> > > > > developed so far :)
> >> > > > >> > > > >
> >> > > > >> > > > > 1) Regarding the usage of useFixedLengthPrefixExtractor,
> >> > > > >> honestly, I
> >> > > > >> > > hadn't
> >> > > > >> > > > > looked at that config. I did look it up after you
> >> pointed it
> >> > > out
> >> > > > >> and
> >> > > > >> > > seems
> >> > > > >> > > > > it's more for hash-based memtables? I may be wrong
> >> though. But
> >> > > > >> what I
> >> > > > >> > > would
> >> > > > >> > > > > say is that, the changes I had made were not exactly
> >> from a
> >> > > > >> correctness
> >> > > > >> > > > > stand point but more from trying to showcase how we can
> >> > > implement
> >> > > > >> these
> >> > > > >> > > > > changes. The idea was that once we see the merit in this
> >> > > approach
> >> > > > >> then
> >> > > > >> > > we
> >> > > > >> > > > > can add some of the tunings( and I would need your team's
> >> > > > >> assistance
> >> > > > >> > > there
> >> > > > >> > > > > :D).
> >> > > > >> > > > >
> >> > > > >> > > > > 2) Regarding the similarity of `RocksDBPrefixIterator`
> >> and
> >> > > > >> > > > > `RocksDBRangeIterator`, yes the implementations look
> >> more or
> >> > > less
> >> > > > >> > > similar.
> >> > > > >> > > > > So, in terms of performance, they might be similar. But
> >> > > > >> semantically,
> >> > > > >> > > they
> >> > > > >> > > > > can solve 2 different use-cases. The range seek is useful
> >> > > when we
> >> > > > >> know
> >> > > > >> > > both
> >> > > > >> > > > > from and to. But if we consider use-cases where we want
> >> to
> >> > > find
> >> > > > >> keys
> >> > > > >> > > with a
> >> > > > >> > > > > certain prefix, but we don't know if what it's start and
> >> end
> >> > > is,
> >> > > > >> then
> >> > > > >> > > > > prefix seek would come in more handy. The point that I am
> >> > > trying
> >> > > > >> to
> >> > > > >> > > make is
> >> > > > >> > > > > that it can extend the scope of state stores from just
> >> point
> >> > > > >> lookups to
> >> > > > >> > > > > somewhat being able to speculative queries where by
> >> users can
> >> > > > >> search
> >> > > > >> > > if a
> >> > > > >> > > > > certain pattern exists. I can vouch for this personally
> >> > > because I
> >> > > > >> > > wanted to
> >> > > > >> > > > > use state stores for one such use case and since this
> >> option
> >> > > > >> wasn't
> >> > > > >> > > there,
> >> > > > >> > > > > I had to do some other things. An equivalent to this
> >> could be
> >> > > SCAN
> >> > > > >> > > operator
> >> > > > >> > > > > in Redis. (Not trying to compare the Redis and state
> >> stores
> >> > > but
> >> > > > >> trying
> >> > > > >> > > to
> >> > > > >> > > > > give some context).
> >> > > > >> > > > >
> >> > > > >> > > > > Regarding the point on bloom filter, I think there are
> >> certain
> >> > > > >> > > > > optimisations that are being talked about in case of
> >> prefix
> >> > > seek
> >> > > > >> here:
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > >
> >> > > > >>
> >> > >
> >> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> >> > > > >> > > > > Again
> >> > > > >> > > > > this isn't something that I have explored fully. Also,
> >> on the
> >> > > > >> prefix
> >> > > > >> > > seek
> >> > > > >> > > > > page on RocksDB they mention that there's a prefix
> >> iterating
> >> > > > >> technique
> >> > > > >> > > > > called Prefix Bloom Filter.
> >> > > > >> > > > >
> >> > > > >> > > > > 3) Regarding the question on length of bytes for seek v/s
> >> > > prefix
> >> > > > >> seek,
> >> > > > >> > > I am
> >> > > > >> > > > > not entirely sure about that scenario. What I have
> >> understood
> >> > > is
> >> > > > >> that
> >> > > > >> > > > > at-least for Rocks DB, it is more performant for short
> >> > > iterator
> >> > > > >> queries
> >> > > > >> > > > > that longer ones.
> >> > > > >> > > > >
> >> > > > >> > > > > 4) Regarding the last question on placing it within
> >> Segment,
> >> > > the
> >> > > > >> > > reason I
> >> > > > >> > > > > didn't do that way, is that I thought we shouldn't tie
> >> this
> >> > > > >> feature
> >> > > > >> > > only to
> >> > > > >> > > > > RocksDB. I agree that I got this idea while
> >> looking/reading
> >> > > about
> >> > > > >> > > RocksDB
> >> > > > >> > > > > but if we keep it outside the purview of RocksDB and
> >> keep it
> >> > > as a
> >> > > > >> > > pluggable
> >> > > > >> > > > > entity, then a) it remains generic by not being tied to
> >> any
> >> > > > >> specific
> >> > > > >> > > store
> >> > > > >> > > > > and b) no change is needed at all for any of the other
> >> stores
> >> > > > >> which
> >> > > > >> > > haven't
> >> > > > >> > > > > implemented it.
> >> > > > >> > > > >
> >> > > > >> > > > > I am not sure of any of the above points make sense but
> >> as I
> >> > > said,
> >> > > > >> > > this is
> >> > > > >> > > > > based out of my limited understanding of the codebase.
> >> So,
> >> > > pardon
> >> > > > >> any
> >> > > > >> > > > > incorrect/illogical statements plz!
> >> > > > >> > > > >
> >> > > > >> > > > > @Sophie,
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks for bringing that point up! I have mentioned about
> >> > > that PR
> >> > > > >> in
> >> > > > >> > > the
> >> > > > >> > > > > KIP under a section called Other considerations.
> >> Nonetheless,
> >> > > > >> thanks
> >> > > > >> > > for
> >> > > > >> > > > > pointing it out!
> >> > > > >> > > > >
> >> > > > >> > > > > Thanks!
> >> > > > >> > > > > Sagar.
> >> > > > >> > > > >
> >> > > > >> > > > >
> >> > > > >> > > > > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman <
> >> > > > >> > > sop...@confluent.io>
> >> > > > >> > > > > wrote:
> >> > > > >> > > > >
> >> > > > >> > > > > > Not to derail this KIP discussion, but to leave a few
> >> notes
> >> > > on
> >> > > > >> some
> >> > > > >> > > of
> >> > > > >> > > > > the
> >> > > > >> > > > > > RocksDB points that have come up:
> >> > > > >> > > > > >
> >> > > > >> > > > > > Someone actually merged some long overdue performance
> >> > > > >> improvements to
> >> > > > >> > > > > > the RocksJava implementation (the PR was opened back in
> >> > > 2017!
> >> > > > >> yikes).
> >> > > > >> > > > > > I haven't looked into the prefix seek API closely
> >> enough to
> >> > > > >> know how
> >> > > > >> > > > > > relevant
> >> > > > >> > > > > > this particular change is, and they are still improving
> >> > > things,
> >> > > > >> but
> >> > > > >> > > it
> >> > > > >> > > > > > gives me some
> >> > > > >> > > > > > faith.
> >> > > > >> > > > > >
> >> > > > >> > > > > > There are some pretty promising results reported on
> >> the PR:
> >> > > > >> > > > > >
> >> > > > >>
> >> https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
> >> > > > >> > > > > >
> >> > > > >> > > > > > Regarding the custom comparator, they also recently
> >> merged
> >> > > this
> >> > > > >> > > > > performance
> >> > > > >> > > > > > <https://github.com/facebook/rocksdb/pull/6252>
> >> > > > >> > > > > > improvement <
> >> https://github.com/facebook/rocksdb/pull/6252
> >> > > >.
> >> > > > >> The
> >> > > > >> > > tl;dr
> >> > > > >> > > > > is
> >> > > > >> > > > > > they reduced the slowdown of a custom comparator in
> >> Java
> >> > > > >> > > > > > (relative to the native C++) from ~7x to ~5.2x at best.
> >> > > Which is
> >> > > > >> > > still
> >> > > > >> > > > > not
> >> > > > >> > > > > > great, but it
> >> > > > >> > > > > > would be interesting to run our own benchmarks and see
> >> how
> >> > > this
> >> > > > >> > > stacks
> >> > > > >> > > > > up.
> >> > > > >> > > > > >
> >> > > > >> > > > > > Of course, these are all new changes and as such will
> >> > > require
> >> > > > >> us to
> >> > > > >> > > > > upgrade
> >> > > > >> > > > > > rocks to 6.x which means they have to wait for us to
> >> > > release a
> >> > > > >> 3.0.
> >> > > > >> > > But
> >> > > > >> > > > > > there's
> >> > > > >> > > > > > some talk about 3.0 coming in the next few releases so
> >> > > consider
> >> > > > >> it
> >> > > > >> > > food
> >> > > > >> > > > > for
> >> > > > >> > > > > > not-so-future thought
> >> > > > >> > > > > >
> >> > > > >> > > > > >
> >> > > > >> > > > > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <
> >> > > > >> > > adam.bellem...@gmail.com
> >> > > > >> > > > > >
> >> > > > >> > > > > > wrote:
> >> > > > >> > > > > >
> >> > > > >> > > > > > > Hi Guozhang
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > For clarity, the issues I was running into was not
> >> about
> >> > > the
> >> > > > >> actual
> >> > > > >> > > > > > > *prefixSeek* function itself, but about exposing it
> >> to the
> >> > > > >> same
> >> > > > >> > > level
> >> > > > >> > > > > of
> >> > > > >> > > > > > > access as the *range* function throughout Kafka
> >> Streams.
> >> > > It
> >> > > > >> > > required a
> >> > > > >> > > > > > lot
> >> > > > >> > > > > > > of changes, and also required that most state stores
> >> stub
> >> > > it
> >> > > > >> out
> >> > > > >> > > since
> >> > > > >> > > > > it
> >> > > > >> > > > > > > wasn't clear how they would implement it. It was
> >> > > basically an
> >> > > > >> > > > > > overreaching
> >> > > > >> > > > > > > API change that was easily solved (for the specific
> >> > > > >> prefix-scan in
> >> > > > >> > > FKJ)
> >> > > > >> > > > > > by
> >> > > > >> > > > > > > simply using *range*. So to be clear, the blockers
> >> were
> >> > > > >> > > predominantly
> >> > > > >> > > > > > > around correctly handling the API changes, nothing
> >> to do
> >> > > with
> >> > > > >> the
> >> > > > >> > > > > > > mechanisms of the RocksDB prefix scanning.
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > As for KAFKA-5285 I'll look into it more to see if I
> >> can
> >> > > get a
> >> > > > >> > > better
> >> > > > >> > > > > > > handle on the problem!
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > Hope this helps clear it up.
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > Adam
> >> > > > >> > > > > > >
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <
> >> > > > >> wangg...@gmail.com>
> >> > > > >> > > > > > wrote:
> >> > > > >> > > > > > >
> >> > > > >> > > > > > > > Hello Adam,
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > I'm wondering if you can provide a bit more
> >> context on
> >> > > the
> >> > > > >> > > blockers
> >> > > > >> > > > > of
> >> > > > >> > > > > > > > using prefixSeek of RocksDB (I saw you have a
> >> > > > >> > > RocksDBPrefixIterator
> >> > > > >> > > > > > class
> >> > > > >> > > > > > > > but not used anywhere yet)? I'm currently looking
> >> at
> >> > > ways to
> >> > > > >> > > allow
> >> > > > >> > > > > some
> >> > > > >> > > > > > > > secondary indices with rocksDB following some
> >> existing
> >> > > > >> approaches
> >> > > > >> > > > > > > > from CockroachDB etc so I'm very curious to learn
> >> your
> >> > > > >> > > experience.
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > 1) Before considering any secondary indices, a
> >> quick
> >> > > > >> thought is
> >> > > > >> > > that
> >> > > > >> > > > > > for
> >> > > > >> > > > > > > > (key, timeFrom, timeTo) queries, we can easily
> >> replace
> >> > > the
> >> > > > >> > > current
> >> > > > >> > > > > > > > `range()` impl with a `prefixRange()` impl via a
> >> prefix
> >> > > > >> iterator;
> >> > > > >> > > > > > though
> >> > > > >> > > > > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much
> >> more
> >> > > > >> > > complicated
> >> > > > >> > > > > > indeed
> >> > > > >> > > > > > > > and hence existing `range()` impl may still be
> >> used.
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > 2) Another related issue I've been pondering for a
> >> > > while is
> >> > > > >> > > > > > > > around KAFKA-5285: with the default lexicograpic
> >> byte
> >> > > > >> comparator,
> >> > > > >> > > > > since
> >> > > > >> > > > > > > the
> >> > > > >> > > > > > > > key length varies, the combo (key, window) would
> >> have
> >> > > > >> > > interleaving
> >> > > > >> > > > > byte
> >> > > > >> > > > > > > > layouts like:
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > AAA0001          (key AAA, timestamp 0001)
> >> > > > >> > > > > > > > AAA00011        (key AAA0, timestamp 0011)
> >> > > > >> > > > > > > > AAA0002          (key AAA, timestamp 0002)
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > which is challenging for prefix seeks to work
> >> > > efficiently.
> >> > > > >> > > Although
> >> > > > >> > > > > we
> >> > > > >> > > > > > > can
> >> > > > >> > > > > > > > overwrite the byte-comparator in JNI it is very
> >> > > expensive
> >> > > > >> and the
> >> > > > >> > > > > cost
> >> > > > >> > > > > > of
> >> > > > >> > > > > > > > JNI overwhelms its benefits. If you've got some
> >> ideas
> >> > > > >> around it
> >> > > > >> > > > > please
> >> > > > >> > > > > > > lmk
> >> > > > >> > > > > > > > as well.
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > Guozhang
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <
> >> > > > >> > > > > > adam.bellem...@gmail.com
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > wrote:
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > > Hi Sagar
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > I implemented a very similar interface for
> >> KIP-213,
> >> > > the
> >> > > > >> > > foreign-key
> >> > > > >> > > > > > > > joiner.
> >> > > > >> > > > > > > > > We pulled it out of the final implementation and
> >> > > instead
> >> > > > >> used
> >> > > > >> > > > > RocksDB
> >> > > > >> > > > > > > > range
> >> > > > >> > > > > > > > > instead. You can see the particular code where
> >> we use
> >> > > > >> > > > > > > RocksDB.range(...)
> >> > > > >> > > > > > > > to
> >> > > > >> > > > > > > > > get the same iterator result.
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > >
> >> > > > >> > > > > >
> >> > > > >> > > > >
> >> > > > >> > >
> >> > > > >>
> >> > >
> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > We pulled it out because there were numerous
> >> awkward
> >> > > > >> > > acrobatics to
> >> > > > >> > > > > > > > > integrate *prefixSeek()* function into the Kafka
> >> > > Streams
> >> > > > >> code.
> >> > > > >> > > > > > > > Basically, I
> >> > > > >> > > > > > > > > wanted to be able to access *prefixSeek()* the
> >> same
> >> > > way I
> >> > > > >> can
> >> > > > >> > > > > access
> >> > > > >> > > > > > > > > *range()* for any state store, and in particular
> >> use
> >> > > it
> >> > > > >> for
> >> > > > >> > > storing
> >> > > > >> > > > > > > data
> >> > > > >> > > > > > > > > with a particular foreign key (as per the
> >> previous
> >> > > URL).
> >> > > > >> > > However, I
> >> > > > >> > > > > > > found
> >> > > > >> > > > > > > > > out that it required way too many changes to
> >> expose
> >> > > the
> >> > > > >> > > > > > *prefixSeek()*
> >> > > > >> > > > > > > > > functionality while still being able to leverage
> >> all
> >> > > the
> >> > > > >> nice
> >> > > > >> > > Kafka
> >> > > > >> > > > > > > > Streams
> >> > > > >> > > > > > > > > state management + supplier functionality, so we
> >> made
> >> > > a
> >> > > > >> > > decision
> >> > > > >> > > > > just
> >> > > > >> > > > > > > to
> >> > > > >> > > > > > > > > stick with *range()* and pull everything else
> >> out.
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > I guess my question here is, how do you
> >> anticipate
> >> > > using
> >> > > > >> > > > > > *prefixSeek()*
> >> > > > >> > > > > > > > > within the framework of Kafka Streams, or the
> >> > > Processor
> >> > > > >> API?
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > Adam
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar <
> >> > > > >> > > sagarmeansoc...@gmail.com>
> >> > > > >> > > > > > > wrote:
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > > > > Hi All,
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > > > I would like to start a discussion on the KIP
> >> that I
> >> > > > >> created
> >> > > > >> > > > > below
> >> > > > >> > > > > > to
> >> > > > >> > > > > > > > add
> >> > > > >> > > > > > > > > > prefix scan support in State Stores:
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > >
> >> > > > >> > > > > >
> >> > > > >> > > > >
> >> > > > >> > >
> >> > > > >>
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > > > Thanks!
> >> > > > >> > > > > > > > > > Sagar.
> >> > > > >> > > > > > > > > >
> >> > > > >> > > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > > > --
> >> > > > >> > > > > > > > -- Guozhang
> >> > > > >> > > > > > > >
> >> > > > >> > > > > > >
> >> > > > >> > > > > >
> >> > > > >> > > > >
> >> > > > >> > > >
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to