Hi Guozhang

For clarity, the issues I was running into was not about the actual
*prefixSeek* function itself, but about exposing it to the same level of
access as the *range* function throughout Kafka Streams. It required a lot
of changes, and also required that most state stores stub it out since it
wasn't clear how they would implement it. It was basically an overreaching
API change that was easily solved (for the specific prefix-scan in FKJ) by
simply using *range*. So to be clear, the blockers were predominantly
around correctly handling the API changes, nothing to do with the
mechanisms of the RocksDB prefix scanning.

As for KAFKA-5285 I'll look into it more to see if I can get a better
handle on the problem!

Hope this helps clear it up.

Adam


On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Adam,
>
> I'm wondering if you can provide a bit more context on the blockers of
> using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator class
> but not used anywhere yet)? I'm currently looking at ways to allow some
> secondary indices with rocksDB following some existing approaches
> from CockroachDB etc so I'm very curious to learn your experience.
>
> 1) Before considering any secondary indices, a quick thought is that for
> (key, timeFrom, timeTo) queries, we can easily replace the current
> `range()` impl with a `prefixRange()` impl via a prefix iterator; though
> for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated indeed
> and hence existing `range()` impl may still be used.
>
> 2) Another related issue I've been pondering for a while is
> around KAFKA-5285: with the default lexicograpic byte comparator, since the
> key length varies, the combo (key, window) would have interleaving byte
> layouts like:
>
> AAA0001          (key AAA, timestamp 0001)
> AAA00011        (key AAA0, timestamp 0011)
> AAA0002          (key AAA, timestamp 0002)
>
> which is challenging for prefix seeks to work efficiently. Although we can
> overwrite the byte-comparator in JNI it is very expensive and the cost of
> JNI overwhelms its benefits. If you've got some ideas around it please lmk
> as well.
>
> Guozhang
>
>
>
>
> On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <adam.bellem...@gmail.com>
> wrote:
>
> > Hi Sagar
> >
> > I implemented a very similar interface for KIP-213, the foreign-key
> joiner.
> > We pulled it out of the final implementation and instead used RocksDB
> range
> > instead. You can see the particular code where we use RocksDB.range(...)
> to
> > get the same iterator result.
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> >
> > We pulled it out because there were numerous awkward acrobatics to
> > integrate *prefixSeek()* function into the Kafka Streams code.
> Basically, I
> > wanted to be able to access *prefixSeek()* the same way I can access
> > *range()* for any state store, and in particular use it for storing data
> > with a particular foreign key (as per the previous URL). However, I found
> > out that it required way too many changes to expose the *prefixSeek()*
> > functionality while still being able to leverage all the nice Kafka
> Streams
> > state management + supplier functionality, so we made a decision just to
> > stick with *range()* and pull everything else out.
> >
> > I guess my question here is, how do you anticipate using *prefixSeek()*
> > within the framework of Kafka Streams, or the Processor API?
> >
> > Adam
> >
> >
> >
> > On Tue, May 12, 2020 at 2:52 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > > Hi All,
> > >
> > > I would like to start a discussion on the KIP that I created below to
> add
> > > prefix scan support in State Stores:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > >
> > > Thanks!
> > > Sagar.
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to