Actually... I've got the 1-to-1 variant doing wonders for me. I replaced
the #aggregate() with #reduce(((k, v) -> v, (k, v) -> null) and things are
just lovely. Combining these indices with the various join operations, I am
able to to build up deeply nested structures, or eh, materialized views,
from flat changelog feeds. As the changelog entities change or tombstone,
so do the views.

The 1-to-many case I still have to play more with. I will update here if I
discover anything good.

Thank you.

On Wed, Feb 8, 2017 at 4:14 PM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> > And before we discuss deeper, a follow up question: if you map from
> <k,v> to new_key, is this mapping "unique", or could it be that two
> different k/v-pairs map to the same new_key?
>
> Yes, this has been central in my exploration so far. For some fields the
> field is unique, for others it is not. There's never ambiguity. Unique
> fields are known to be unique (eg. user email address). Non-unique fields
> are know to be non-unique (e.g. user country).
>
> Right now I am laying out my topology such that uniqueness is enforced
> before indexing. So the issue I am concerned with here specifically is just
> the indexing. Unless, of course, the two can somehow be tied together in
> some elegant manner? But anyway, my question is based on that assumption
> that uniqueness in indexing input is not a concern.
>
> And yes, I've noticed that if you have a unique field the result is 1-to-1
> key-to-entity mapping, whereas for non-unique fields each index key may
> have a list of entities it maps to. For non-unique fields where an index
> key may map to thousands of entities, it is not practical maintaining them
> in a single aggregation.
>
> Any further guidance would be greatly appreciated. Thank you!
>
>
>
>
>
> On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> It's difficult problem.
>>
>> And before we discuss deeper, a follow up question: if you map from
>> <k,v> to new_key, is this mapping "unique", or could it be that two
>> different k/v-pairs map to the same new_key?
>>
>> If there are overlaps, you end up with a different problem as if there
>> are no overlaps, because you would get "updated" for the new_key from
>> two different places and new_key would actually not be a primary key
>> anymore (while for a "unique" mapping the new_key would still be a
>> primary key)
>>
>>
>> Hope this makes sense. It's a little hard to explain in an email.
>>
>>
>> -Matthias
>>
>> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
>> > I have a changelog that I'd like to index by some other key. So,
>> something
>> > like this:
>> >
>> >         class Item {
>> >             byte[] id;
>> >             String name;
>> >         }
>> >
>> >         KStreamBuilder topology = new KStreamBuilder();
>> >
>> >         KTable<byte[], Item> items = topology
>> >           .table("items-changelog", "items");
>> >
>> > To index the Items by name:
>> >
>> >         items
>> >           .toStream()
>> >           .selectKey((k, v) -> v.name)
>> >           .to("items-by-name-changelog");
>> >
>> >         KTable<String, Item> itemsByName = topology
>> >           .table("items-by-name-changelog", "items-by-name");
>> >
>> >
>> > However, if the topic "items-changelog" receives a tombstone for some
>> > particular Item (val: null), the key selector indexing function yields
>> an
>> > NPE because that value is null.
>> >
>> > My solution to this problem has been aggregations:
>> >
>> >         KTable<String, Item> itemsByName = items
>> >           .groupBy((k, v) -> new KeyValue<>(v.name, v))
>> >           .aggregate(() -> null,
>> >                      (k, v, a) -> v,
>> >                      (k, v, a) -> null,
>> >                      "items-by-name");
>> >
>> > This works because the grouping/aggregation maintains state on the
>> incoming
>> > topic, and when an item tombstone arrives it can calculate the key and
>> > propagate the tombstone to the index.
>> >
>> > However, this doesn't work flexibly for all cases (I'll omit them for
>> now)
>> > and I find myself drawn to the lower-level Processor API which is fine
>> > except I'd rather just use the DSL for its expressiveness.
>> >
>> > What is the correct way to do this kind of indexing? I need the index to
>> > update correctly when the key value changes or when a tombstone is
>> received.
>> >
>> >
>> >
>> > Thank you,
>> > Dmitry
>> >
>>
>>
>

Reply via email to