If the mapping is 1-to-1 than you can get it done. That's good. As you observed by yourself, with non-unique mapping it's way harder (or maybe even impossible) to get this.
Also your KTable#groupBy(...)#aggregate(...) is a good solution. Thus, now I am just wondering, what you mean by: > However, this doesn't work flexibly for all cases (I'll omit them for now) So let's get down to the actual problem :) -Matthias On 2/8/17 1:14 PM, Dmitry Minkovsky wrote: >> And before we discuss deeper, a follow up question: if you map from > <k,v> to new_key, is this mapping "unique", or could it be that two > different k/v-pairs map to the same new_key? > > Yes, this has been central in my exploration so far. For some fields the > field is unique, for others it is not. There's never ambiguity. Unique > fields are known to be unique (eg. user email address). Non-unique fields > are know to be non-unique (e.g. user country). > > Right now I am laying out my topology such that uniqueness is enforced > before indexing. So the issue I am concerned with here specifically is just > the indexing. Unless, of course, the two can somehow be tied together in > some elegant manner? But anyway, my question is based on that assumption > that uniqueness in indexing input is not a concern. > > And yes, I've noticed that if you have a unique field the result is 1-to-1 > key-to-entity mapping, whereas for non-unique fields each index key may > have a list of entities it maps to. For non-unique fields where an index > key may map to thousands of entities, it is not practical maintaining them > in a single aggregation. > > Any further guidance would be greatly appreciated. Thank you! > > > > > > On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> It's difficult problem. >> >> And before we discuss deeper, a follow up question: if you map from >> <k,v> to new_key, is this mapping "unique", or could it be that two >> different k/v-pairs map to the same new_key? >> >> If there are overlaps, you end up with a different problem as if there >> are no overlaps, because you would get "updated" for the new_key from >> two different places and new_key would actually not be a primary key >> anymore (while for a "unique" mapping the new_key would still be a >> primary key) >> >> >> Hope this makes sense. It's a little hard to explain in an email. >> >> >> -Matthias >> >> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: >>> I have a changelog that I'd like to index by some other key. So, >> something >>> like this: >>> >>> class Item { >>> byte[] id; >>> String name; >>> } >>> >>> KStreamBuilder topology = new KStreamBuilder(); >>> >>> KTable<byte[], Item> items = topology >>> .table("items-changelog", "items"); >>> >>> To index the Items by name: >>> >>> items >>> .toStream() >>> .selectKey((k, v) -> v.name) >>> .to("items-by-name-changelog"); >>> >>> KTable<String, Item> itemsByName = topology >>> .table("items-by-name-changelog", "items-by-name"); >>> >>> >>> However, if the topic "items-changelog" receives a tombstone for some >>> particular Item (val: null), the key selector indexing function yields an >>> NPE because that value is null. >>> >>> My solution to this problem has been aggregations: >>> >>> KTable<String, Item> itemsByName = items >>> .groupBy((k, v) -> new KeyValue<>(v.name, v)) >>> .aggregate(() -> null, >>> (k, v, a) -> v, >>> (k, v, a) -> null, >>> "items-by-name"); >>> >>> This works because the grouping/aggregation maintains state on the >> incoming >>> topic, and when an item tombstone arrives it can calculate the key and >>> propagate the tombstone to the index. >>> >>> However, this doesn't work flexibly for all cases (I'll omit them for >> now) >>> and I find myself drawn to the lower-level Processor API which is fine >>> except I'd rather just use the DSL for its expressiveness. >>> >>> What is the correct way to do this kind of indexing? I need the index to >>> update correctly when the key value changes or when a tombstone is >> received. >>> >>> >>> >>> Thank you, >>> Dmitry >>> >> >> >
signature.asc
Description: OpenPGP digital signature