It's difficult problem.

And before we discuss deeper, a follow up question: if you map from
<k,v> to new_key, is this mapping "unique", or could it be that two
different k/v-pairs map to the same new_key?

If there are overlaps, you end up with a different problem as if there
are no overlaps, because you would get "updated" for the new_key from
two different places and new_key would actually not be a primary key
anymore (while for a "unique" mapping the new_key would still be a
primary key)


Hope this makes sense. It's a little hard to explain in an email.


-Matthias

On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
> I have a changelog that I'd like to index by some other key. So, something
> like this:
> 
>         class Item {
>             byte[] id;
>             String name;
>         }
> 
>         KStreamBuilder topology = new KStreamBuilder();
> 
>         KTable<byte[], Item> items = topology
>           .table("items-changelog", "items");
> 
> To index the Items by name:
> 
>         items
>           .toStream()
>           .selectKey((k, v) -> v.name)
>           .to("items-by-name-changelog");
> 
>         KTable<String, Item> itemsByName = topology
>           .table("items-by-name-changelog", "items-by-name");
> 
> 
> However, if the topic "items-changelog" receives a tombstone for some
> particular Item (val: null), the key selector indexing function yields an
> NPE because that value is null.
> 
> My solution to this problem has been aggregations:
> 
>         KTable<String, Item> itemsByName = items
>           .groupBy((k, v) -> new KeyValue<>(v.name, v))
>           .aggregate(() -> null,
>                      (k, v, a) -> v,
>                      (k, v, a) -> null,
>                      "items-by-name");
> 
> This works because the grouping/aggregation maintains state on the incoming
> topic, and when an item tombstone arrives it can calculate the key and
> propagate the tombstone to the index.
> 
> However, this doesn't work flexibly for all cases (I'll omit them for now)
> and I find myself drawn to the lower-level Processor API which is fine
> except I'd rather just use the DSL for its expressiveness.
> 
> What is the correct way to do this kind of indexing? I need the index to
> update correctly when the key value changes or when a tombstone is received.
> 
> 
> 
> Thank you,
> Dmitry
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to