Thanks Adam, Sagar.

I read your PR as well the rocksDB reference, and I have a few quick
questions:

1. In your code I saw you did not specifically overwrite any rocksDB
configs like `useFixedLengthPrefixExtractor`. Also, by comparing the
`RocksDBPrefixIterator` and `RocksDBRangeIterator` classes, their `seek()`
calls are not different, so suppose two calls:

range(bytesFrom, bytesTo) where bytesTo is just `bytesFrom + 1`.
prefix(bytesPrefix)

Their performance should be exactly the same? Do we need to enable prefix
extractor --- note although today we have a bloom filter, it is not based
on prefix but the whole key with 10bytes / block-base-mode by default ---
in order to have a better performance?

2. Also, my understanding is that, if the byte length in `iterator#seek()`
call is smaller than the prefix length, then it would not have any effects;
only if the bytes passed into `iterator#seek()` is larger would we save
extra IOs, is that right?

3. Assuming that we can indeed get the perf benefits by turning on some
configs (then existing stores cannot use it unless we restarts them with
new configs), I looked at the code a bit deeper and here's a wild thought
--- I think you have already been there, so maybe you can let me know what
blocks that idea:

a) Add a new

```
KeyValueIterator<Bytes, byte[]> prefixRange(final Bytes prefix, final Bytes
from, final Bytes to);
```

to the internal interface `Segment`.

b) Add its implementation `prefixRange(final Bytes prefix, final Bytes
from, final Bytes to)` that relies on the augmented `RocksDBPrefixIterator`
where we first seek by the prefix, and then continue dropping makeNext
until we hit the `from` bytes, and then start returning the values until we
hit `to` bytes and stop.

c) `WindowStoreIterator<V> ReadOnlyWindowStore#fetch(K key, Instant from,
Instant to)` then can be implemented as `prefixRange(final Bytes prefix,
final Bytes from, final Bytes to)`.


Let me know if that makes sense?


Guozhang



On Tue, May 12, 2020 at 5:02 PM Adam Bellemare <adam.bellem...@gmail.com>
wrote:

> Hi Guozhang
>
> For clarity, the issues I was running into was not about the actual
> *prefixSeek* function itself, but about exposing it to the same level of
> access as the *range* function throughout Kafka Streams. It required a lot
> of changes, and also required that most state stores stub it out since it
> wasn't clear how they would implement it. It was basically an overreaching
> API change that was easily solved (for the specific prefix-scan in FKJ) by
> simply using *range*. So to be clear, the blockers were predominantly
> around correctly handling the API changes, nothing to do with the
> mechanisms of the RocksDB prefix scanning.
>
> As for KAFKA-5285 I'll look into it more to see if I can get a better
> handle on the problem!
>
> Hope this helps clear it up.
>
> Adam
>
>
> On Tue, May 12, 2020 at 7:16 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Adam,
> >
> > I'm wondering if you can provide a bit more context on the blockers of
> > using prefixSeek of RocksDB (I saw you have a RocksDBPrefixIterator class
> > but not used anywhere yet)? I'm currently looking at ways to allow some
> > secondary indices with rocksDB following some existing approaches
> > from CockroachDB etc so I'm very curious to learn your experience.
> >
> > 1) Before considering any secondary indices, a quick thought is that for
> > (key, timeFrom, timeTo) queries, we can easily replace the current
> > `range()` impl with a `prefixRange()` impl via a prefix iterator; though
> > for (keyFrom, keyTo, timeFrom, timeTo) it is much more complicated indeed
> > and hence existing `range()` impl may still be used.
> >
> > 2) Another related issue I've been pondering for a while is
> > around KAFKA-5285: with the default lexicograpic byte comparator, since
> the
> > key length varies, the combo (key, window) would have interleaving byte
> > layouts like:
> >
> > AAA0001          (key AAA, timestamp 0001)
> > AAA00011        (key AAA0, timestamp 0011)
> > AAA0002          (key AAA, timestamp 0002)
> >
> > which is challenging for prefix seeks to work efficiently. Although we
> can
> > overwrite the byte-comparator in JNI it is very expensive and the cost of
> > JNI overwhelms its benefits. If you've got some ideas around it please
> lmk
> > as well.
> >
> > Guozhang
> >
> >
> >
> >
> > On Tue, May 12, 2020 at 6:26 AM Adam Bellemare <adam.bellem...@gmail.com
> >
> > wrote:
> >
> > > Hi Sagar
> > >
> > > I implemented a very similar interface for KIP-213, the foreign-key
> > joiner.
> > > We pulled it out of the final implementation and instead used RocksDB
> > range
> > > instead. You can see the particular code where we use
> RocksDB.range(...)
> > to
> > > get the same iterator result.
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplier.java#L95
> > >
> > > We pulled it out because there were numerous awkward acrobatics to
> > > integrate *prefixSeek()* function into the Kafka Streams code.
> > Basically, I
> > > wanted to be able to access *prefixSeek()* the same way I can access
> > > *range()* for any state store, and in particular use it for storing
> data
> > > with a particular foreign key (as per the previous URL). However, I
> found
> > > out that it required way too many changes to expose the *prefixSeek()*
> > > functionality while still being able to leverage all the nice Kafka
> > Streams
> > > state management + supplier functionality, so we made a decision just
> to
> > > stick with *range()* and pull everything else out.
> > >
> > > I guess my question here is, how do you anticipate using *prefixSeek()*
> > > within the framework of Kafka Streams, or the Processor API?
> > >
> > > Adam
> > >
> > >
> > >
> > > On Tue, May 12, 2020 at 2:52 AM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I would like to start a discussion on the KIP that I created below to
> > add
> > > > prefix scan support in State Stores:
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to