It's difficult problem. And before we discuss deeper, a follow up question: if you map from <k,v> to new_key, is this mapping "unique", or could it be that two different k/v-pairs map to the same new_key?
If there are overlaps, you end up with a different problem as if there
are no overlaps, because you would get "updated" for the new_key from
two different places and new_key would actually not be a primary key
anymore (while for a "unique" mapping the new_key would still be a
primary key)
Hope this makes sense. It's a little hard to explain in an email.
-Matthias
On 2/8/17 10:22 AM, Dmitry Minkovsky wrote:
> I have a changelog that I'd like to index by some other key. So, something
> like this:
>
> class Item {
> byte[] id;
> String name;
> }
>
> KStreamBuilder topology = new KStreamBuilder();
>
> KTable<byte[], Item> items = topology
> .table("items-changelog", "items");
>
> To index the Items by name:
>
> items
> .toStream()
> .selectKey((k, v) -> v.name)
> .to("items-by-name-changelog");
>
> KTable<String, Item> itemsByName = topology
> .table("items-by-name-changelog", "items-by-name");
>
>
> However, if the topic "items-changelog" receives a tombstone for some
> particular Item (val: null), the key selector indexing function yields an
> NPE because that value is null.
>
> My solution to this problem has been aggregations:
>
> KTable<String, Item> itemsByName = items
> .groupBy((k, v) -> new KeyValue<>(v.name, v))
> .aggregate(() -> null,
> (k, v, a) -> v,
> (k, v, a) -> null,
> "items-by-name");
>
> This works because the grouping/aggregation maintains state on the incoming
> topic, and when an item tombstone arrives it can calculate the key and
> propagate the tombstone to the index.
>
> However, this doesn't work flexibly for all cases (I'll omit them for now)
> and I find myself drawn to the lower-level Processor API which is fine
> except I'd rather just use the DSL for its expressiveness.
>
> What is the correct way to do this kind of indexing? I need the index to
> update correctly when the key value changes or when a tombstone is received.
>
>
>
> Thank you,
> Dmitry
>
signature.asc
Description: OpenPGP digital signature
