Hi, Sagar!

Thanks for this KIP. I'm sorry it took me so long to reply. I'll number my
points differently to avoid confusion.

I can provide some additional context on the difficulties we previously
faced in KIP-213 (which you and Adam have already discussed).

J1) In your KIP, you propose the following interface:

public interface PrefixSeekableStore<K, V> {
    KeyValueIterator<K, V> prefixSeek(K prefix);
}

This is roughly the same thing that Adam and I were considering
before. It has a hidden problem, that it assumes that prefixes of
keys in the key space are also in the key space. In other words, this
is a store with key type K, and the API assumes that prefixes are also
of type K. This is true for some key types, like String or Bytes, but not
for others.

For example, if the keys are UUIDs, then no prefix is also a UUID. If the
key is a complex data type, like Windowed<K> in our own DSL, then
we would absolutely want to query all keys with the same record key
(the K part), or the same window start time, but in neither case is the 
prefix actually a Windowed<K>. 

You can skirt the issue by defining a third type parameter, maybe KP, that
is the "prefix" type, but this would also be awkward for many usages.

J2) There is a related problem with serialization. Whether something
is a prefix or not depends not on the Java key (K), but on the binary
format that is produced when you use a serde on the key. Whether
we say that the prefix must also be a K or whether it gets its own type,
KP, there are problems.

In the latter case, we must additionally require a second set of serdes
for the prefixes, but there's no obvious way to incorporate this in the
API, especially not in the DSL.

In either case, for the API to actually work, we need to know ahead
of time that the Serde will produce a binary key that starts with the
part that we wish to use as a prefix. For example, what we were doing
briefly in KIP-213 (where we had complex keys, similar to Windowed<K>)
was to define "dummy" values that indicate that a Windowed<K> is actually
just a prefix key, not a real key. Maybe the window start time would be
null or the key part would be null. But we also had to define a serde
that would very specifically anticipate which component of the complex
key would need to be used in a prefix key. Having to bring all these
parts together in a reliable, easy-to-debug, fashion gives me some doubt
that people would actually be able to use this feature in complicated
programs without driving themselves crazy.

J3) Thanks so much for including benchmarks and tests! Unfortunately,
these don't include everything you need to really plug into the Streams
API. I think when you push it a little farther, you'll realize what Adam
was talking about wrt the interface difficulties.

In your benchmark and tests, you directly construct the store and then
use it, but in a real Streams application, you can only provide your
implementation in a StoreSupplier, for example via the Materialized
parameter. Then, to use the store from inside a Processor, you'd have
to get it by name from the ProcessorContext, and then cast it to one of
the pre-defined store types, KeyValueStore, WindowedStore, or 
SessionStore. It won't work to "mix in" your interface because the
processor gets a store that's wrapped in layers that handle serialization,
change-logging, recording metrics, and caching. 

To use the store through IQ, you have to provide a QueriableStoreType
to KafkaStreams#store, and you get back a similarly wrapped store.

I think our only choices to add an interface like yours is either to add
it to one of the existing store types, like KeyValueStore or
WindowedStore, or to define a completely new store hierarchy, meaning
you have to duplicate all the "wrapper" layers in Streams.

I think if you write an "end-to-end" test, where you write a Streams app,
provide your store, and then use it in a Processor and through IQ,
you'll see what I'm talking about.

IIRC, those three points were the ones that ultimately led us to abandon
the whole idea last time and just register the stores with key type Bytes.
I think some creative solutions may yet be possible, but it'll take some
more design work to get there.

Can I ask what your motivation is, exactly, for proposing this feature?
The motivation just says "some users may want to do it", which has
the advantage that it's impossible to disagree with, but doesn't provide
a lot of concrete detail ;)

Specifically, what I'm wondering is whether you wanted to use this as
part of a KayValue store, which might be a challenge, or whether you
wanted to use it for more efficient scans in a WindowedStore, like
Guozhang.

Thanks again for the KIP! I hope my response isn't too discouraging;
I just wanted to convey the challenges we faced last time, since they
are all not obvious up front.

Best regards,
-John


On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> Whoops, I guess I didn't finish reading the KIP all the way to the end
> earlier. Thanks
> for including the link to the RocksDB PR in the KIP!
> 
> I have one additional question about the proposal: do you plan to also add
> this
> prefix seek API to the dual column family iterators? These are used by
> RocksDBTimestampedStore (which extends RocksDBStore), for example the
> *RocksDBDualCFRangeIterator*
> 
> Thanks for the KIP!
> 
> On Thu, May 14, 2020 at 10:50 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> 
> > Hey @Adam,
> >
> > Thanks for sharing your experience with using prefix seek. I did look at
> > your code for RocksDBPrefixIterator, infact I have repurposed that class
> > itself since it wasn't being used else where. Regarding how I plan to
> > expose them through-out the state stores, what I have tried to do is add it
> > as a separate interface. So, basically, it is not at the same level as the
> > *range function so to speak. The reason I did that is currently I feel not
> > all state stores are a natural fit for prefix seek. As I mentioned in the
> > KIP as well, the current equivalent to it could be BulkLoadingStore(not in
> > terms of functionality but in terms of how it is also not implemented by
> > all of them). So, that ways I am not needing to stub them across all the
> > state-stores and we can implement it only where needed. For example, in the
> > PR that I have put for reference in the KIP, you can see that I have it
> > implemented only for RocksDB.
> >
> > @Guozhang,
> >
> > Thanks for the feedback. Those are very interesting questions and I will
> > try my best to answer based upon whatever limited understanding I have
> > developed so far :)
> >
> > 1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, I hadn't
> > looked at that config. I did look it up after you pointed it out and seems
> > it's more for hash-based memtables? I may be wrong though. But what I would
> > say is that, the changes I had made were not exactly from a correctness
> > stand point but more from trying to showcase how we can implement these
> > changes. The idea was that once we see the merit in this approach then we
> > can add some of the tunings( and I would need your team's assistance there
> > :D).
> >
> > 2) Regarding the similarity of `RocksDBPrefixIterator` and
> > `RocksDBRangeIterator`, yes the implementations look more or less similar.
> > So, in terms of performance, they might be similar. But semantically, they
> > can solve 2 different use-cases. The range seek is useful when we know both
> > from and to. But if we consider use-cases where we want to find keys with a
> > certain prefix, but we don't know if what it's start and end is, then
> > prefix seek would come in more handy. The point that I am trying to make is
> > that it can extend the scope of state stores from just point lookups to
> > somewhat being able to speculative queries where by users can search if a
> > certain pattern exists. I can vouch for this personally because I wanted to
> > use state stores for one such use case and since this option wasn't there,
> > I had to do some other things. An equivalent to this could be SCAN operator
> > in Redis. (Not trying to compare the Redis and state stores but trying to
> > give some context).
> >
> > Regarding the point on bloom filter, I think there are certain
> > optimisations that are being talked about in case of prefix seek here:
> >
> >
> > https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> > Again
> > this isn't something that I have explored fully. Also, on the prefix seek
> > page on RocksDB they mention that there's a prefix iterating technique
> > called Prefix Bloom Filter.
> >
> > 3) Regarding the question on length of bytes for seek v/s prefix seek, I am
> > not entirely sure about that scenario. What I have understood is that
> > at-least for Rocks DB, it is more performant for short iterator queries
> > that longer ones.
> >
> > 4) Regarding the last question on placing it within Segment, the reason I
> > didn't do that way, is that I thought we shouldn't tie this feature only to
> > RocksDB. I agree that I got this idea while looking/reading about RocksDB
> > but if we keep it outside the purview of RocksDB and keep it as a pluggable
> > entity, then a) it remains generic by not being tied to any specific store
> > and b) no change is needed at all for any of the other stores which haven't
> > implemented it.
> >
> > I am not sure of any of the above points make sense but as I said, this is
> > based out of my limited understanding of the codebase. So, pardon any
> > incorrect/illogical statements plz!
> >
> > @Sophie,
> >
> > Thanks for bringing that point up! I have mentioned about that PR in the
> > KIP under a section called Other considerations. Nonetheless, thanks for
> > pointing it out!
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> > > Not to derail this KIP discussion, but to leave a few notes on some of
> > the
> > > RocksDB points that have come up:
> > >
> > > Someone actually merged some long overdue performance improvements to
> > > the RocksJava implementation (the PR was opened back in 2017! yikes).
> > > I haven't looked into the prefix seek API closely enough to know how
> > > relevant
> > > this particular change is, and they are still improving things, but it
> > > gives me some
> > > faith.
> > >
> > > There are some pretty promising results reported on the PR:
> > > https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
> > >
> > > Regarding the custom comparator, they also recently merged this
> > performance
> > > <https://github.com/facebook/rocksdb/pull/6252>
> > > improvement <https://github.com/facebook/rocksdb/pull/6252>. The tl;dr
> > is
> > > they reduced the slowdown of a custom comparator in Java
> > > (relative to the native C++) from ~7x to ~5.2x at best. Which is still
> > not
> > > great, but it
> > > would be interesting to run our own benchmarks and see how this stacks
> > up.
> > >
> > > Of course, these are all new changes and as such will require us to
> > upgrade
> > > rocks to 6.x which means they have to wait for us to release a 3.0. But
> > > there's
> > > some talk about 3.0 coming in the next few releases so consider it food
> > for
> > > not-so-future thought
> > >
> > >
> > > On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <adam.bellem...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Guozhang
> > > >
> > > > For clarity, the issues I was running into was not about the actual
> > > > *prefixSeek* function itself, but about exposing it to the same level
> > of
> > > > access as the *range* function throughout Kafka Streams. It required a
> > > lot
> > > > of changes, and also required that most state stores stub it out since
> > it
> > > > wasn't clear how they would implement it. It was basically an
> > > overreaching
> > > > API change that was easily solved (for the specific prefix-scan in FKJ)
> > > by
> > > > simply using *range*. So to be clear, the blockers were predominantly
> > > > around correctly handling the API changes, nothing to do with the
> > > > mechanisms of the RocksDB prefix scanning.
> > > >
> > > > As for KAFKA-5285 I'll look into it more to see if I can get a better
> > > > handle on the problem!
> > > >
> > > > Hope this helps clear it up.
> > > >
> > > > Adam
> > > >
> > > >
> > > > On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Adam,
> > > > >
> > > > > I'm wondering if you can provide a bit more context on the blockers
> > of
> > > > > using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator
> > > class
> > > > > but not used anywhere yet)? I'm currently looking at ways to allow
> > some
> > > > > secondary indices with rocksDB following some existing approaches
> > > > > from CockroachDB etc so I'm very curious to learn your experience.
> > > > >
> > > > > 1) Before considering any secondary indices, a quick thought is that
> > > for
> > > > > (key, timeFrom, timeTo) queries, we can easily replace the current
> > > > > `range()` impl with a `prefixRange()` impl via a prefix iterator;
> > > though
> > > > > for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated
> > > indeed
> > > > > and hence existing `range()` impl may still be used.
> > > > >
> > > > > 2) Another related issue I've been pondering for a while is
> > > > > around KAFKA-5285: with the default lexicograpic byte comparator,
> > since
> > > > the
> > > > > key length varies, the combo (key, window) would have interleaving
> > byte
> > > > > layouts like:
> > > > >
> > > > > AAA0001          (key AAA, timestamp 0001)
> > > > > AAA00011        (key AAA0, timestamp 0011)
> > > > > AAA0002          (key AAA, timestamp 0002)
> > > > >
> > > > > which is challenging for prefix seeks to work efficiently. Although
> > we
> > > > can
> > > > > overwrite the byte-comparator in JNI it is very expensive and the
> > cost
> > > of
> > > > > JNI overwhelms its benefits. If you've got some ideas around it
> > please
> > > > lmk
> > > > > as well.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <
> > > adam.bellem...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Sagar
> > > > > >
> > > > > > I implemented a very similar interface for KIP-213, the foreign-key
> > > > > joiner.
> > > > > > We pulled it out of the final implementation and instead used
> > RocksDB
> > > > > range
> > > > > > instead. You can see the particular code where we use
> > > > RocksDB.range(...)
> > > > > to
> > > > > > get the same iterator result.
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > > > > >
> > > > > > We pulled it out because there were numerous awkward acrobatics to
> > > > > > integrate *prefixSeek()* function into the Kafka Streams code.
> > > > > Basically, I
> > > > > > wanted to be able to access *prefixSeek()* the same way I can
> > access
> > > > > > *range()* for any state store, and in particular use it for storing
> > > > data
> > > > > > with a particular foreign key (as per the previous URL). However, I
> > > > found
> > > > > > out that it required way too many changes to expose the
> > > *prefixSeek()*
> > > > > > functionality while still being able to leverage all the nice Kafka
> > > > > Streams
> > > > > > state management + supplier functionality, so we made a decision
> > just
> > > > to
> > > > > > stick with *range()* and pull everything else out.
> > > > > >
> > > > > > I guess my question here is, how do you anticipate using
> > > *prefixSeek()*
> > > > > > within the framework of Kafka Streams, or the Processor API?
> > > > > >
> > > > > > Adam
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 12, 2020 at 2:52 AM Sagar <sagarmeansoc...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I would like to start a discussion on the KIP that I created
> > below
> > > to
> > > > > add
> > > > > > > prefix scan support in State Stores:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to