It's difficult problem. And before we discuss deeper, a follow up question: if you map from <k,v> to new_key, is this mapping "unique", or could it be that two different k/v-pairs map to the same new_key?
If there are overlaps, you end up with a different problem as if there are no overlaps, because you would get "updated" for the new_key from two different places and new_key would actually not be a primary key anymore (while for a "unique" mapping the new_key would still be a primary key) Hope this makes sense. It's a little hard to explain in an email. -Matthias On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: > I have a changelog that I'd like to index by some other key. So, something > like this: > > class Item { > byte[] id; > String name; > } > > KStreamBuilder topology = new KStreamBuilder(); > > KTable<byte[], Item> items = topology > .table("items-changelog", "items"); > > To index the Items by name: > > items > .toStream() > .selectKey((k, v) -> v.name) > .to("items-by-name-changelog"); > > KTable<String, Item> itemsByName = topology > .table("items-by-name-changelog", "items-by-name"); > > > However, if the topic "items-changelog" receives a tombstone for some > particular Item (val: null), the key selector indexing function yields an > NPE because that value is null. > > My solution to this problem has been aggregations: > > KTable<String, Item> itemsByName = items > .groupBy((k, v) -> new KeyValue<>(v.name, v)) > .aggregate(() -> null, > (k, v, a) -> v, > (k, v, a) -> null, > "items-by-name"); > > This works because the grouping/aggregation maintains state on the incoming > topic, and when an item tombstone arrives it can calculate the key and > propagate the tombstone to the index. > > However, this doesn't work flexibly for all cases (I'll omit them for now) > and I find myself drawn to the lower-level Processor API which is fine > except I'd rather just use the DSL for its expressiveness. > > What is the correct way to do this kind of indexing? I need the index to > update correctly when the key value changes or when a tombstone is received. > > > > Thank you, > Dmitry >
signature.asc
Description: OpenPGP digital signature