Re: Kafka Streams: How to best maintain changelog indices using the DSL?
If the mapping is 1-to-1 than you can get it done. That's good. As you observed by yourself, with non-unique mapping it's way harder (or maybe even impossible) to get this. Also your KTable#groupBy(...)#aggregate(...) is a good solution. Thus, now I am just wondering, what you mean by: > However, this doesn't work flexibly for all cases (I'll omit them for now) So let's get down to the actual problem :) -Matthias On 2/8/17 1:14 PM, Dmitry Minkovsky wrote: >> And before we discuss deeper, a follow up question: if you map from >to new_key, is this mapping "unique", or could it be that two > different k/v-pairs map to the same new_key? > > Yes, this has been central in my exploration so far. For some fields the > field is unique, for others it is not. There's never ambiguity. Unique > fields are known to be unique (eg. user email address). Non-unique fields > are know to be non-unique (e.g. user country). > > Right now I am laying out my topology such that uniqueness is enforced > before indexing. So the issue I am concerned with here specifically is just > the indexing. Unless, of course, the two can somehow be tied together in > some elegant manner? But anyway, my question is based on that assumption > that uniqueness in indexing input is not a concern. > > And yes, I've noticed that if you have a unique field the result is 1-to-1 > key-to-entity mapping, whereas for non-unique fields each index key may > have a list of entities it maps to. For non-unique fields where an index > key may map to thousands of entities, it is not practical maintaining them > in a single aggregation. > > Any further guidance would be greatly appreciated. Thank you! > > > > > > On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax > wrote: > >> It's difficult problem. >> >> And before we discuss deeper, a follow up question: if you map from >> to new_key, is this mapping "unique", or could it be that two >> different k/v-pairs map to the same new_key? >> >> If there are overlaps, you end up with a different problem as if there >> are no overlaps, because you would get "updated" for the new_key from >> two different places and new_key would actually not be a primary key >> anymore (while for a "unique" mapping the new_key would still be a >> primary key) >> >> >> Hope this makes sense. It's a little hard to explain in an email. >> >> >> -Matthias >> >> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: >>> I have a changelog that I'd like to index by some other key. So, >> something >>> like this: >>> >>> class Item { >>> byte[] id; >>> String name; >>> } >>> >>> KStreamBuilder topology = new KStreamBuilder(); >>> >>> KTable items = topology >>> .table("items-changelog", "items"); >>> >>> To index the Items by name: >>> >>> items >>> .toStream() >>> .selectKey((k, v) -> v.name) >>> .to("items-by-name-changelog"); >>> >>> KTable itemsByName = topology >>> .table("items-by-name-changelog", "items-by-name"); >>> >>> >>> However, if the topic "items-changelog" receives a tombstone for some >>> particular Item (val: null), the key selector indexing function yields an >>> NPE because that value is null. >>> >>> My solution to this problem has been aggregations: >>> >>> KTable itemsByName = items >>> .groupBy((k, v) -> new KeyValue<>(v.name, v)) >>> .aggregate(() -> null, >>> (k, v, a) -> v, >>> (k, v, a) -> null, >>> "items-by-name"); >>> >>> This works because the grouping/aggregation maintains state on the >> incoming >>> topic, and when an item tombstone arrives it can calculate the key and >>> propagate the tombstone to the index. >>> >>> However, this doesn't work flexibly for all cases (I'll omit them for >> now) >>> and I find myself drawn to the lower-level Processor API which is fine >>> except I'd rather just use the DSL for its expressiveness. >>> >>> What is the correct way to do this kind of indexing? I need the index to >>> update correctly when the key value changes or when a tombstone is >> received. >>> >>> >>> >>> Thank you, >>> Dmitry >>> >> >> > signature.asc Description: OpenPGP digital signature
Re: Kafka Streams: How to best maintain changelog indices using the DSL?
Actually... I've got the 1-to-1 variant doing wonders for me. I replaced the #aggregate() with #reduce(((k, v) -> v, (k, v) -> null) and things are just lovely. Combining these indices with the various join operations, I am able to to build up deeply nested structures, or eh, materialized views, from flat changelog feeds. As the changelog entities change or tombstone, so do the views. The 1-to-many case I still have to play more with. I will update here if I discover anything good. Thank you. On Wed, Feb 8, 2017 at 4:14 PM, Dmitry Minkovskywrote: > > And before we discuss deeper, a follow up question: if you map from > to new_key, is this mapping "unique", or could it be that two > different k/v-pairs map to the same new_key? > > Yes, this has been central in my exploration so far. For some fields the > field is unique, for others it is not. There's never ambiguity. Unique > fields are known to be unique (eg. user email address). Non-unique fields > are know to be non-unique (e.g. user country). > > Right now I am laying out my topology such that uniqueness is enforced > before indexing. So the issue I am concerned with here specifically is just > the indexing. Unless, of course, the two can somehow be tied together in > some elegant manner? But anyway, my question is based on that assumption > that uniqueness in indexing input is not a concern. > > And yes, I've noticed that if you have a unique field the result is 1-to-1 > key-to-entity mapping, whereas for non-unique fields each index key may > have a list of entities it maps to. For non-unique fields where an index > key may map to thousands of entities, it is not practical maintaining them > in a single aggregation. > > Any further guidance would be greatly appreciated. Thank you! > > > > > > On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax > wrote: > >> It's difficult problem. >> >> And before we discuss deeper, a follow up question: if you map from >> to new_key, is this mapping "unique", or could it be that two >> different k/v-pairs map to the same new_key? >> >> If there are overlaps, you end up with a different problem as if there >> are no overlaps, because you would get "updated" for the new_key from >> two different places and new_key would actually not be a primary key >> anymore (while for a "unique" mapping the new_key would still be a >> primary key) >> >> >> Hope this makes sense. It's a little hard to explain in an email. >> >> >> -Matthias >> >> On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: >> > I have a changelog that I'd like to index by some other key. So, >> something >> > like this: >> > >> > class Item { >> > byte[] id; >> > String name; >> > } >> > >> > KStreamBuilder topology = new KStreamBuilder(); >> > >> > KTable items = topology >> > .table("items-changelog", "items"); >> > >> > To index the Items by name: >> > >> > items >> > .toStream() >> > .selectKey((k, v) -> v.name) >> > .to("items-by-name-changelog"); >> > >> > KTable itemsByName = topology >> > .table("items-by-name-changelog", "items-by-name"); >> > >> > >> > However, if the topic "items-changelog" receives a tombstone for some >> > particular Item (val: null), the key selector indexing function yields >> an >> > NPE because that value is null. >> > >> > My solution to this problem has been aggregations: >> > >> > KTable itemsByName = items >> > .groupBy((k, v) -> new KeyValue<>(v.name, v)) >> > .aggregate(() -> null, >> > (k, v, a) -> v, >> > (k, v, a) -> null, >> > "items-by-name"); >> > >> > This works because the grouping/aggregation maintains state on the >> incoming >> > topic, and when an item tombstone arrives it can calculate the key and >> > propagate the tombstone to the index. >> > >> > However, this doesn't work flexibly for all cases (I'll omit them for >> now) >> > and I find myself drawn to the lower-level Processor API which is fine >> > except I'd rather just use the DSL for its expressiveness. >> > >> > What is the correct way to do this kind of indexing? I need the index to >> > update correctly when the key value changes or when a tombstone is >> received. >> > >> > >> > >> > Thank you, >> > Dmitry >> > >> >> >
Re: Kafka Streams: How to best maintain changelog indices using the DSL?
> And before we discuss deeper, a follow up question: if you map fromto new_key, is this mapping "unique", or could it be that two different k/v-pairs map to the same new_key? Yes, this has been central in my exploration so far. For some fields the field is unique, for others it is not. There's never ambiguity. Unique fields are known to be unique (eg. user email address). Non-unique fields are know to be non-unique (e.g. user country). Right now I am laying out my topology such that uniqueness is enforced before indexing. So the issue I am concerned with here specifically is just the indexing. Unless, of course, the two can somehow be tied together in some elegant manner? But anyway, my question is based on that assumption that uniqueness in indexing input is not a concern. And yes, I've noticed that if you have a unique field the result is 1-to-1 key-to-entity mapping, whereas for non-unique fields each index key may have a list of entities it maps to. For non-unique fields where an index key may map to thousands of entities, it is not practical maintaining them in a single aggregation. Any further guidance would be greatly appreciated. Thank you! On Wed, Feb 8, 2017 at 3:56 PM, Matthias J. Sax wrote: > It's difficult problem. > > And before we discuss deeper, a follow up question: if you map from > to new_key, is this mapping "unique", or could it be that two > different k/v-pairs map to the same new_key? > > If there are overlaps, you end up with a different problem as if there > are no overlaps, because you would get "updated" for the new_key from > two different places and new_key would actually not be a primary key > anymore (while for a "unique" mapping the new_key would still be a > primary key) > > > Hope this makes sense. It's a little hard to explain in an email. > > > -Matthias > > On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: > > I have a changelog that I'd like to index by some other key. So, > something > > like this: > > > > class Item { > > byte[] id; > > String name; > > } > > > > KStreamBuilder topology = new KStreamBuilder(); > > > > KTable items = topology > > .table("items-changelog", "items"); > > > > To index the Items by name: > > > > items > > .toStream() > > .selectKey((k, v) -> v.name) > > .to("items-by-name-changelog"); > > > > KTable itemsByName = topology > > .table("items-by-name-changelog", "items-by-name"); > > > > > > However, if the topic "items-changelog" receives a tombstone for some > > particular Item (val: null), the key selector indexing function yields an > > NPE because that value is null. > > > > My solution to this problem has been aggregations: > > > > KTable itemsByName = items > > .groupBy((k, v) -> new KeyValue<>(v.name, v)) > > .aggregate(() -> null, > > (k, v, a) -> v, > > (k, v, a) -> null, > > "items-by-name"); > > > > This works because the grouping/aggregation maintains state on the > incoming > > topic, and when an item tombstone arrives it can calculate the key and > > propagate the tombstone to the index. > > > > However, this doesn't work flexibly for all cases (I'll omit them for > now) > > and I find myself drawn to the lower-level Processor API which is fine > > except I'd rather just use the DSL for its expressiveness. > > > > What is the correct way to do this kind of indexing? I need the index to > > update correctly when the key value changes or when a tombstone is > received. > > > > > > > > Thank you, > > Dmitry > > > >
Re: Kafka Streams: How to best maintain changelog indices using the DSL?
It's difficult problem. And before we discuss deeper, a follow up question: if you map fromto new_key, is this mapping "unique", or could it be that two different k/v-pairs map to the same new_key? If there are overlaps, you end up with a different problem as if there are no overlaps, because you would get "updated" for the new_key from two different places and new_key would actually not be a primary key anymore (while for a "unique" mapping the new_key would still be a primary key) Hope this makes sense. It's a little hard to explain in an email. -Matthias On 2/8/17 10:22 AM, Dmitry Minkovsky wrote: > I have a changelog that I'd like to index by some other key. So, something > like this: > > class Item { > byte[] id; > String name; > } > > KStreamBuilder topology = new KStreamBuilder(); > > KTable items = topology > .table("items-changelog", "items"); > > To index the Items by name: > > items > .toStream() > .selectKey((k, v) -> v.name) > .to("items-by-name-changelog"); > > KTable itemsByName = topology > .table("items-by-name-changelog", "items-by-name"); > > > However, if the topic "items-changelog" receives a tombstone for some > particular Item (val: null), the key selector indexing function yields an > NPE because that value is null. > > My solution to this problem has been aggregations: > > KTable itemsByName = items > .groupBy((k, v) -> new KeyValue<>(v.name, v)) > .aggregate(() -> null, > (k, v, a) -> v, > (k, v, a) -> null, > "items-by-name"); > > This works because the grouping/aggregation maintains state on the incoming > topic, and when an item tombstone arrives it can calculate the key and > propagate the tombstone to the index. > > However, this doesn't work flexibly for all cases (I'll omit them for now) > and I find myself drawn to the lower-level Processor API which is fine > except I'd rather just use the DSL for its expressiveness. > > What is the correct way to do this kind of indexing? I need the index to > update correctly when the key value changes or when a tombstone is received. > > > > Thank you, > Dmitry > signature.asc Description: OpenPGP digital signature
Kafka Streams: How to best maintain changelog indices using the DSL?
I have a changelog that I'd like to index by some other key. So, something like this: class Item { byte[] id; String name; } KStreamBuilder topology = new KStreamBuilder(); KTableitems = topology .table("items-changelog", "items"); To index the Items by name: items .toStream() .selectKey((k, v) -> v.name) .to("items-by-name-changelog"); KTable itemsByName = topology .table("items-by-name-changelog", "items-by-name"); However, if the topic "items-changelog" receives a tombstone for some particular Item (val: null), the key selector indexing function yields an NPE because that value is null. My solution to this problem has been aggregations: KTable itemsByName = items .groupBy((k, v) -> new KeyValue<>(v.name, v)) .aggregate(() -> null, (k, v, a) -> v, (k, v, a) -> null, "items-by-name"); This works because the grouping/aggregation maintains state on the incoming topic, and when an item tombstone arrives it can calculate the key and propagate the tombstone to the index. However, this doesn't work flexibly for all cases (I'll omit them for now) and I find myself drawn to the lower-level Processor API which is fine except I'd rather just use the DSL for its expressiveness. What is the correct way to do this kind of indexing? I need the index to update correctly when the key value changes or when a tombstone is received. Thank you, Dmitry