I have a changelog that I'd like to index by some other key. So, something
like this:

        class Item {
            byte[] id;
            String name;
        }

        KStreamBuilder topology = new KStreamBuilder();

        KTable<byte[], Item> items = topology
          .table("items-changelog", "items");

To index the Items by name:

        items
          .toStream()
          .selectKey((k, v) -> v.name)
          .to("items-by-name-changelog");

        KTable<String, Item> itemsByName = topology
          .table("items-by-name-changelog", "items-by-name");


However, if the topic "items-changelog" receives a tombstone for some
particular Item (val: null), the key selector indexing function yields an
NPE because that value is null.

My solution to this problem has been aggregations:

        KTable<String, Item> itemsByName = items
          .groupBy((k, v) -> new KeyValue<>(v.name, v))
          .aggregate(() -> null,
                     (k, v, a) -> v,
                     (k, v, a) -> null,
                     "items-by-name");

This works because the grouping/aggregation maintains state on the incoming
topic, and when an item tombstone arrives it can calculate the key and
propagate the tombstone to the index.

However, this doesn't work flexibly for all cases (I'll omit them for now)
and I find myself drawn to the lower-level Processor API which is fine
except I'd rather just use the DSL for its expressiveness.

What is the correct way to do this kind of indexing? I need the index to
update correctly when the key value changes or when a tombstone is received.



Thank you,
Dmitry

Reply via email to