Actually... I've got the 1-to-1 variant doing wonders for me. I replaced the #aggregate() with #reduce(((k, v) -> v, (k, v) -> null) and things are just lovely. Combining these indices with the various join operations, I am able to to build up deeply nested structures, or eh, materialized views, from flat changelog feeds. As the changelog entities change or tombstone, so do the views.
The 1-to-many case I still have to play more with. I will update here if I discover anything good. Thank you. On Wed, Feb 8, 2017 at 4:14 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > > And before we discuss deeper, a follow up question: if you map from > <k,v> to new_key, is this mapping "unique", or could it be that two > different k/v-pairs map to the same new_key? > > Yes, this has been central in my exploration so far. For some fields the > field is unique, for others it is not. There's never ambiguity. Unique > fields are known to be unique (eg. user email address). Non-unique fields > are know to be non-unique (e.g. user country). > > Right now I am laying out my topology such that uniqueness is enforced > before indexing. So the issue I am concerned with here specifically is just > the indexing. Unless, of course, the two can somehow be tied together in > some elegant manner? But anyway, my question is based on that assumption > that uniqueness in indexing input is not a concern. > > And yes, I've noticed that if you have a unique field the result is 1-to-1 > key-to-entity mapping, whereas for non-unique fields each index key may > have a list of entities it maps to. For non-unique fields where an index > key may map to thousands of entities, it is not practical maintaining them > in a single aggregation. > > Any further guidance would be greatly appreciated. Thank you! > > > > > > On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> It's difficult problem. >> >> And before we discuss deeper, a follow up question: if you map from >> <k,v> to new_key, is this mapping "unique", or could it be that two >> different k/v-pairs map to the same new_key? >> >> If there are overlaps, you end up with a different problem as if there >> are no overlaps, because you would get "updated" for the new_key from >> two different places and new_key would actually not be a primary key >> anymore (while for a "unique" mapping the new_key would still be a >> primary key) >> >> >> Hope this makes sense. It's a little hard to explain in an email. >> >> >> -Matthias >> >> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: >> > I have a changelog that I'd like to index by some other key. So, >> something >> > like this: >> > >> > class Item { >> > byte[] id; >> > String name; >> > } >> > >> > KStreamBuilder topology = new KStreamBuilder(); >> > >> > KTable<byte[], Item> items = topology >> > .table("items-changelog", "items"); >> > >> > To index the Items by name: >> > >> > items >> > .toStream() >> > .selectKey((k, v) -> v.name) >> > .to("items-by-name-changelog"); >> > >> > KTable<String, Item> itemsByName = topology >> > .table("items-by-name-changelog", "items-by-name"); >> > >> > >> > However, if the topic "items-changelog" receives a tombstone for some >> > particular Item (val: null), the key selector indexing function yields >> an >> > NPE because that value is null. >> > >> > My solution to this problem has been aggregations: >> > >> > KTable<String, Item> itemsByName = items >> > .groupBy((k, v) -> new KeyValue<>(v.name, v)) >> > .aggregate(() -> null, >> > (k, v, a) -> v, >> > (k, v, a) -> null, >> > "items-by-name"); >> > >> > This works because the grouping/aggregation maintains state on the >> incoming >> > topic, and when an item tombstone arrives it can calculate the key and >> > propagate the tombstone to the index. >> > >> > However, this doesn't work flexibly for all cases (I'll omit them for >> now) >> > and I find myself drawn to the lower-level Processor API which is fine >> > except I'd rather just use the DSL for its expressiveness. >> > >> > What is the correct way to do this kind of indexing? I need the index to >> > update correctly when the key value changes or when a tombstone is >> received. >> > >> > >> > >> > Thank you, >> > Dmitry >> > >> >> >