Re: Re-key by multiple properties without composite key
I see. You need to ensure that you get _all_ Person. For this case, I guess you are right. You would need to first aggregate the folder per person: KTable allPersonFolders = folder.groupBy((...) -> (folder.customerId, ...)) .aggregate(...) And in a second step, do a left join: result = personTable.leftJoin(allPersonFolders,...) Reading the topic as a table directly did not work out as that crashed the application; apparently reading the topic as a KTable and then using that for three independent re-key-operations is not allowed. Not sure if I can follow. What do you mean by "crashed". -- For tables, there is no `selectKey()` nor a `repartition()` as explained in my previous reply. However, doing a `table.groupBy(...)` will set a new key and repartition the data to your needs. -Matthias On 2/1/24 1:12 AM, Karsten Stöckmann wrote: Thanks so much for taking a look. An FK-table-table join is an inner join which implies there would be no Person entites without associated Folders. Unfortunately, that's not the case. That lead me to an attempt of re-keying the Folder topic by each of the three possible foreign keys in order to be able to left join Persons against each of the three re-keyed KTables to build an eventual Person aggregation containing all possible Folders associated in any way. Reading the topic as a table directly did not work out as that crashed the application; apparently reading the topic as a KTable and then using that for three independent re-key-operations is not allowed. Best wishes, Karsten Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax : Thanks for the details. This does make sense. So it seems you can read all topic as table (ie, builder.table("topic") -- no need to so `builder.stream().toTable()`). And you can use the built-in FK-table-table join, and aggregate the result: KTable result = folderTable .join(personTable, (folderId, folder) -> folder.customerId, ...) .groupBy((...) -> (personId, ...)) .aggregate(...); result.toStream().to("resultTopic"); Note the fk-extractor `(folderId, folder) -> folder.customerId` that tells the join to use `customerId` from the `folderTable` to lookup the person from personTable. Think of `folderTable` as fact-table and `personTable` as dimension table. KS will take care of everything else under the hood automatically. -Matthias On 1/30/24 11:25 AM, Karsten Stöckmann wrote: Matthias, thanks for getting back on this. I'll try to illustrate my intent with an example as I'm not yet fully familiar with Kafka (Streams) and its idioms... Assume classes Person and Folder: class Person { Long id; String firstname; String lastname; // some content } class Folder { Long id; String folderNumber; // some other content Long customerId; // FK, points to Person.id Long billingAddressId; // FK, also points to Person.id } Thus both foreign keys of Folder point to Person entities, yet with different semantics. They're not composite keys but act independently. Now assume I want to build an aggregate Person object containing Folder.folderNumber of all folders associated with a Person entity, regardless whether it acts as a customer or billing address. My (naive) idea was to build re-keyed KTables by Folder.customerId and Folder.billingAddressId and then joining / aggregating them with the Person KTable in order to build something like this: class AggregatedPerson { Long id; List folderNumbers; // or even List // ... } (The latter supposed to be written to an output topic in order to serve as input for Solr or ElasticSearch.) Does this even make sense? If you read the topic a KTable, you cannot repartition because it violates the contract. A KTable must be partitioned by it's primary key, ie, the ID field, and thus the DSL does not offer you a repartition option. So re-key means repartition? ATM the partition size of all input topics is 1 as per Kafka UI, as I've specified no extra configuration for them. Best wishes, Karsten Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax : Both fk1 and fk2 point to the PK of another entity (not shown for brevity, of no relevance to the question). It this two independent FK, or one two-column FK? Ingesting the topic into a Kafka Streams application, how can I re-key the resulting KTable by both fk1 and fk2? If you read the topic a KTable, you cannot repartition because it violates the contract. A KTable must be partitioned by it's primary key, ie, the ID field, and thus the DSL does not offer you a repartition option. You could read the topic as KStream though, and provide a custom `StreamPartitioner` for a `repartition()` operation. However, this is also "dangerous" because for a KStream it's also assumed that it's partitioned by it's key, and you might break downstream DSL operators with such a violation of the "contract". Looking into your solution: .toTable()
Re: Re-key by multiple properties without composite key
Thanks so much for taking a look. An FK-table-table join is an inner join which implies there would be no Person entites without associated Folders. Unfortunately, that's not the case. That lead me to an attempt of re-keying the Folder topic by each of the three possible foreign keys in order to be able to left join Persons against each of the three re-keyed KTables to build an eventual Person aggregation containing all possible Folders associated in any way. Reading the topic as a table directly did not work out as that crashed the application; apparently reading the topic as a KTable and then using that for three independent re-key-operations is not allowed. Best wishes, Karsten Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax : > > Thanks for the details. This does make sense. > > So it seems you can read all topic as table (ie, builder.table("topic") > -- no need to so `builder.stream().toTable()`). > > And you can use the built-in FK-table-table join, and aggregate the result: > > KTable result = >folderTable >.join(personTable, (folderId, folder) -> folder.customerId, ...) >.groupBy((...) -> (personId, ...)) >.aggregate(...); > result.toStream().to("resultTopic"); > > Note the fk-extractor `(folderId, folder) -> folder.customerId` that > tells the join to use `customerId` from the `folderTable` to lookup the > person from personTable. > > Think of `folderTable` as fact-table and `personTable` as dimension table. > > > KS will take care of everything else under the hood automatically. > > > -Matthias > > On 1/30/24 11:25 AM, Karsten Stöckmann wrote: > > Matthias, thanks for getting back on this. I'll try to illustrate my > > intent with an example as I'm not yet fully familiar with Kafka > > (Streams) and its idioms... > > > > Assume classes Person and Folder: > > > > class Person { > >Long id; > >String firstname; > >String lastname; > >// some content > > } > > > > class Folder { > >Long id; > >String folderNumber; > >// some other content > >Long customerId; // FK, points to Person.id > >Long billingAddressId; // FK, also points to Person.id > > } > > > > Thus both foreign keys of Folder point to Person entities, yet with > > different semantics. They're not composite keys but act independently. > > > > Now assume I want to build an aggregate Person object containing > > Folder.folderNumber of all folders associated with a Person entity, > > regardless whether it acts as a customer or billing address. My > > (naive) idea was to build re-keyed KTables by Folder.customerId and > > Folder.billingAddressId and then joining / aggregating them with the > > Person KTable in order to build something like this: > > > > class AggregatedPerson { > >Long id; > >List folderNumbers; // or even List > >// ... > > } > > > > (The latter supposed to be written to an output topic in order to > > serve as input for Solr or ElasticSearch.) > > > > Does this even make sense? > > > > > >> If you read the topic a KTable, you cannot repartition because it > >> violates the contract. A KTable must be partitioned by it's primary key, > >> ie, the ID field, and thus the DSL does not offer you a repartition option. > > > > So re-key means repartition? ATM the partition size of all input > > topics is 1 as per Kafka UI, as I've specified no extra configuration > > for them. > > > > Best wishes, > > Karsten > > > > Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax > > : > >> > Both fk1 and fk2 point to the PK of another entity (not shown for > brevity, of no relevance to the question). > >> > >> It this two independent FK, or one two-column FK? > >> > >> > >>> Ingesting the topic into a Kafka Streams application, how can I re-key > >>> the resulting KTable by both fk1 and fk2? > >> > >> If you read the topic a KTable, you cannot repartition because it > >> violates the contract. A KTable must be partitioned by it's primary key, > >> ie, the ID field, and thus the DSL does not offer you a repartition option. > >> > >> You could read the topic as KStream though, and provide a custom > >> `StreamPartitioner` for a `repartition()` operation. However, this is > >> also "dangerous" because for a KStream it's also assumed that it's > >> partitioned by it's key, and you might break downstream DSL operators > >> with such a violation of the "contract". > >> > >> Looking into your solution: > >> > >>> .toTable() > >>> .groupBy( > >>> (key, value) -> KeyValue.pair(value.fk1(), value), > >>> Grouped.with(...)) > >> > >> This will set fk1 as key, what seems not to align with you previous > >> comment about the key should stay the ID? (Same for f2k). > >> > >> Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's > >> unclear what you try to actually do to begin with? It sound like it's > >> overall a self-join of the input topic on fk1 and fk2 ? > >> > >> > >> -Matthias > >> > >> On 1/28/24