I have a changelog that I'd like to index by some other key. So, something like this:
class Item { byte[] id; String name; } KStreamBuilder topology = new KStreamBuilder(); KTable<byte[], Item> items = topology .table("items-changelog", "items"); To index the Items by name: items .toStream() .selectKey((k, v) -> v.name) .to("items-by-name-changelog"); KTable<String, Item> itemsByName = topology .table("items-by-name-changelog", "items-by-name"); However, if the topic "items-changelog" receives a tombstone for some particular Item (val: null), the key selector indexing function yields an NPE because that value is null. My solution to this problem has been aggregations: KTable<String, Item> itemsByName = items .groupBy((k, v) -> new KeyValue<>(v.name, v)) .aggregate(() -> null, (k, v, a) -> v, (k, v, a) -> null, "items-by-name"); This works because the grouping/aggregation maintains state on the incoming topic, and when an item tombstone arrives it can calculate the key and propagate the tombstone to the index. However, this doesn't work flexibly for all cases (I'll omit them for now) and I find myself drawn to the lower-level Processor API which is fine except I'd rather just use the DSL for its expressiveness. What is the correct way to do this kind of indexing? I need the index to update correctly when the key value changes or when a tombstone is received. Thank you, Dmitry