I don't mind you being a bit picky. I think it is a great discussion and it
helps me too. For example I clearly see now that the problem of aggregation
still needs to be solved for this use case.
Please see my answers below.

I used an example of OrderEvents to OrderItems relationship as 1:1 just to
demonstrate that even in tht simple case the existing table-table join on
FK will not work. However the use case I have in general may have 1:1, 1:0,
or 1:n relations. One complex business entity I had to deal with called
"transportation waybill" that has 25 child tables. Number of child records
in each child table could be 0:n for each record in the main waybill table.
When an event is generated for a certain waybill then a "complete" waybill
needs to be assembled from the subset of child tables. The subset of child
tables for waybill data assembly depends on the event type (any event type
has waybillId). There is also some additional filtering mapping and
enrichment that needs to be done in a real use case that is not relevant to
what we discuss. As you can see the use case is very complex and this is
why I wanted to distill it to very simple terms that are relevant to this
discussion.

Now I am switching back to the simple example of OrderEvent with OrderItems.
Please note that OrderEvent is a derived message. It is derived by joining
the actual event message that has orderId as its key with the Order message
that also has OrderId as its key. Because joining these two messages is
trivial I did not include this part and stated that we are sharing from
the  Order Event message right away.
So to summarize: We need to join each OrderEvent message (OrderId is key)
with 0 or 1 or many orderItems messages (OrderItem is the key and orderID
is one of the message fields).

Now, let's consider your solution:
1. We do not need to aggregate orderEvents around the key since we need to
provide an output for each orderEvent (each orderEvent needs to be joined
with an aggregate of OrderItems related to this orderEvent). So we can skip
this step.
2. Because OrderItems are multiple distinct records for each OrderId we can
not rekey them with OrderId PK to the table, uness we do some sort of
aggregation for them. So let's say we rekey orderItems with orderId and
aggregate each record field into an array. I think we also need to
co-partition with OrderEvents.
3. Now we can do stream-table join the orderEvents stream with the
OrderItemsAggregated table using the OrderId key that is common for both.

So the conclusion is that your solution will work with some tweaking
(basically aggregating on OrderItems instead of on events).
While this solution will work it has several issues as follows:

   - This solution was considered when in KIP-213 for the existing
   table-table FK join. There is a discussion on disadvantages of using this
   approach in the article related to KIP-213 and I think the same
   disadvantages will apply to this KIP. Please see here:
   
https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/#workaround

   - I see FK join as the common operation in data manipulation so it would
   be nice to have a shortcut for it and not to try to design it from existing
   functionality all the time. Consider the real use case I discussed at the
   beginning when a business entity has 25 children
   - This solution similarly to mine is "mudding the water" by providing a
   hybrid outcome join + aggregate. At list with my proposal we could
   potentially control it with the flag, or maybe create some special
   aggregate that could be chained after (don't know how to do it yet :-))

Any thoughts?

Regards,

Igor

On Wed, Aug 9, 2023 at 7:19 PM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the details. And sorry for being a little bit picky. My goal
> is to really understand the use-case and the need for this KIP. It's a
> massive change and I just want to ensure we don't add (complex) things
> unnecessarily.
>
>
> So you have a streams of "orderEvents" with key=orderId. You cannot
> represent them as a KTable, because `orderId` is not a PK, but just an
> identify that a message belongs to a certain order. This part I understand.
>
> You also have a KTable "orderItems", with orderId as a value-field.
>
>
>
> >  Relationship between parent and child messages is 1:1
>
> If I understand correctly, you want to join on orderId. If the join is
> 1:1, it means that there is only a single table-record for each unique
> orderId. Thus, orderId could be the PK of the table. If that's correct,
> you could use orderId as the key of "orderItems" and do a regular
> stream-table join. -- Or do I miss something?
>
>
>
> > and to send it only once to the target system as one ‘complete order >
> message for each new ‘order event’ message.
>
> This sound like an aggregation to me, not a join? It seems that an order
> consists of multiple "orderEvent" messages, and you would want to
> aggregate them based on orderId (plus add some more order detail
> information from the table)? Only after all "orderEvent" messages are
> received and the order is "completed" you want to send a result
> downstream (that is fine and would be a filter in the DSL to drop
> incomplete results).
>
>
>
> > Maybe there could be a flag to stream-table foreign key join that would
> > indicate if we want this join to aggregate children or not?
>
> Wouldn't this mud the waters between a join and an aggregation and imply
> that it's a "weird" hybrid operator, and we would also need to change
> the `join()` method to accept an additional `Aggregator` function?
>
>
>
>  From what I understand so far (correct me if I am wrong), you could do
> what you want to do as follows:
>
> // accumulate all orderEvents per `orderId`
> // cf last step to avoid unbounded growth of the result KTable
> KStream orderEventStream = builder.stream("orderEventTopic")
> // you might want to disable caching in the next step
> KTable orderEvents = orderEventStream.groupByKey().aggregate(...);
>
> // rekey you orderItems to use `orderId` as PK for the table
> KStream orderItemStream = builder.stream("orderItemTopic");
> KTable orderItems = orderItemStream.map(/*put orderId as key */).toTable();
>
> // do the join
> KStream enrichedOrders = orderEvents.toStream().join(orderItems);
>
> // drop incomplete orders
> KStreame completedOrderds = enrichedOrders.filter(o -> o.isCompleted());
>
> // publish result
> completedOrderds.to("resultTopic");
>
> // additional cleanup
> completedOrderds.map(/*craft a special "delete order
> message"*/).to("orderEventTopic");
>
>
> The last step is required to have a "cleanup" message to purge state
> from the `orderEvents` KTable that was computed via the aggregation. If
> such a cleanup message is processed by the `aggregate` step, you would
> return `null` as aggregation result to drop the record for the
> corresponding orderId that was completed, to avoid unbounded growth of
> the KTable. (There are other ways to do the same cleanup; it's just one
> example how it could be done.)
>
>
> If I got it wrong, can you explain what part I messed up?
>
>
>
> -Matthias
>
>
>
>
> On 8/7/23 10:15 AM, Igor Fomenko wrote:
> > Hi Matthias,
> >
> > Hi Matthias,
> >
> >
> >
> > Thanks for your comments.
> >
> >
> >
> > I would like to clarify the use case a little more to show why existing
> > table-table foreign key join will not work for the use case I am trying
> to
> > address.
> >
> > Let’s consider the very simple use case with the parent messages in one
> > Kafka topic (‘order event’ messages that also contain some key order
> info)
> > and the child messages in another topic (‘order items’ messages with an
> > additional info for the order). Relationship between parent and child
> > messages is 1:1. Also ‘order items’ message has OrderID as one of its
> > fields (foreign key).
> >
> >
> >
> > The requirement is to combine info of the parent ‘order event’ message
> with
> > child ‘order items’ message using foreign key and to send it only once to
> > the target system as one ‘complete order’ message for each new ‘order
> > event’ message.
> >
> > Please note that the new messages which are related to order items
> (create,
> > update, delete) should not trigger the resulting ‘complete order’
> message).
> >
> >
> >
> >  From the above requirements we can state the following:
> >
> > 1.     Order events are unique and never updated or deleted; they can
> only
> > be replayed if we need to recover the event stream. For our order
> example I
> > would use OrderID as an event key but if we use the KTable to represent
> > events then events with the same OrderID will overwrite each other. This
> > may or may not cause some issues but using the stream to model seems to
> be
> > a more correct approach from at least performance point of view.
> >
> > 2.     We do not want updates from the “order items” table on the right
> > side of the join to generate an output since only events should be the
> > trigger for output messages in our scenario. This is aligned with the
> > stream-table join behavior rather than table-table join when updates are
> > coming from both sides
> >
> > 3.     Stream-table join will give us resulting stream which is more
> align
> > with our output requirements than the table that would be result of
> > table-table join
> >
> >
> >
> > Requirement #2 above is the most important one and it can not be achieved
> > with existing table-table join on foreign key.
> >
> >
> >
> > I also stated that the foreign key table in table-table join is on the
> > ‘wrong’ side for our order management use case. By this I just meant that
> > in stream-table join I am proposing the foreign key table needs to be on
> > the right side and on the existing table-table join it is on the left.
> This
> > is however is irrelevant since we can not use table-table join anyway for
> > the reason #2 above.
> >
> >
> >
> > You made a good point about aggregation of child messages for a more
> > complex use case of 1:n relation between parent and children. Initially I
> > was thinking that aggregation will be just a separate operation that
> could
> > be added after we performed a foreign key join. Now I realize that it
> will
> > not be possible to do it after.
> >
> > Maybe there could be a flag to stream-table foreign key join that would
> > indicate if we want this join to aggregate children or not?
> >
> >
> >
> > What do you think?
> >
> > Regards,
> >
> >
> >
> > Igor
> >
> >
> > On Fri, Aug 4, 2023 at 10:01 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Thanks a lot for providing more background. It's getting much clear to
> >> me now.
> >>
> >> Couple of follow up questions:
> >>
> >>
> >>> It is not possible to use table-table join in this case because
> >> triggering
> >>> events are supplied separately from the actual data entity that needs
> to
> >> be
> >>> "assembled" and these events could only be presented as KStream due to
> >>> their nature.
> >>
> >> Not sure if I understand this part? Why can't those events not
> >> represented as a KTable. You say "could only be presented as KStream due
> >> to their nature" -- what do you mean by this?
> >>
> >> In the end, my understanding is the following (using the example for the
> >> KIP):
> >>
> >> For the shipments <-> orders and order-details <-> orders join, shipment
> >> and order-details are the fact table, what is "reverse" to what you
> >> want? Using existing FK join, it would mean you get two enriched tables,
> >> that you cannot join to each other any further (because we don't support
> >> n:m join): in the end, shipmentId+orderDetailId would be the PK of such
> >> a n:m join?
> >>
> >> If that's correct, (just for the purpose to make sure I understand
> >> correctly), if we would add an n:m join, you could join shipment <->
> >> order-details first, and use a FK join to enrich the result with orders.
> >> -- In addition, you could also do a FK join to event if you represent
> >> events as a table (this relates to my question from above, why events
> >> cannot be represented as a KTable).
> >>
> >>
> >> A the KIP itself, I am still wondering about details: if we get an event
> >> in, and we do a lookup into the "FK table" and find multiple matches,
> >> would we emit multiple results? This would kinda defeat the purpose to
> >> re-assemble everything into a single entity? (And it might require an
> >> additional aggregation downstream to put the entity together.) -- Or
> >> would we join the singe event, with all found table rows, and emit a
> >> single "enriched" event?
> >>
> >>
> >> Thus, I am actually wondering, if you would not pre-process both
> >> shipment and order-details table, via `groupBy(orderId)` and assemble a
> >> list (or similar) of alls shipments (or order-details) per order? If you
> >> do this pre-processing, you can do a PK-PK (1:1) join with the orders
> >> table, and also do a stream-table join to enrich your events will the
> >> full order information?
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 7/26/23 7:13 AM, Igor Fomenko wrote:
> >>> Hello Matthias,
> >>>
> >>> Thank you for this response. It provides the context for a good
> >> discussion
> >>> related to the need for this new interface.
> >>>
> >>> The use case I have in mind is not really a stream enrichment which
> >> usually
> >>> implies that the event has a primary key to some external info and that
> >>> external info could be just looked up in some other data source.
> >>>
> >>> The pattern this KIP proposes is more akin to the data entity assembly
> >>> pattern from the persistence layer so it is not purely integration
> >> pattern
> >>> but rather a pattern that enables an event stream from persistence
> layer
> >> of
> >>> a data source application. The main driver here is the ability to
> stream
> >> a
> >>> data entity of any complexity (complexity in terms of the relational
> >> model)
> >>> from an application database to some data consumers. The technical
> >>> precondition here is of course that data is already extracted from the
> >>> relational database with something like Change Data Capture (CDC) and
> >>> placed to Kafka topics. Also due to CDC limitations, each database
> table
> >>> that is related to the entity relational data model is extracted to the
> >>> separate Kafka topic.
> >>>
> >>> So to answer you first question the entity that needs to be "assembled"
> >>> from Kafka topics in the very common use case has 1:n relations where 1
> >>> corresponds to the triggering event enriched with the data from the
> main
> >>> (or parent) table of the data entity (for example completion of the
> >>> purchase order event + order data from the order table) and n
> corresponds
> >>> to the many children that needs to be joined with the order table to
> have
> >>> the full data entity (for example multiple line items of the purchase
> >> order
> >>> needs to be added from the line items child table).
> >>>
> >>> It is not possible to use table-table join in this case because
> >> triggering
> >>> events are supplied separately from the actual data entity that needs
> to
> >> be
> >>> "assembled" and these events could only be presented as KStream due to
> >>> their nature. Also currently the FK table in table-table join is on the
> >>> "wrong" side of the join.
> >>> It is possible to use existing stream-table join only to get data from
> >> the
> >>> parent entity table (order table) because the event to order is 1:1.
> >> After
> >>> that it is required to add "children" tables of the order to complete
> >>> entity assembly, these childered are related as 1:n with foreign key
> >> fields
> >>> in each child table (which is order ID).
> >>>
> >>> This use case is typically implemented with some sort of ESB (like
> >>> Mulesoft) where ESB receives an event and then uses JDBC adapter to
> issue
> >>> SQL query with left join on foreign key for child tables. ESB then
> loops
> >>> through the returned record set to assemble the full data entity.
> However
> >>> in many cases for various architecture reasons there is a desire to
> >> remove
> >>> JDBC queries from the data source and replace it with CDC streaming
> data
> >> to
> >>> Kafka. So in that case assembling data entities from Kafka topics
> instead
> >>> of JDBC would be beneficial.
> >>>
> >>> Please let me know what you think.
> >>>
> >>> Regards,
> >>>
> >>> Igor
> >>>
> >>> On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>
> >>>> Igor,
> >>>>
> >>>> thanks for the KIP. Interesting proposal. I am wondering a little bit
> >>>> about the use-case and semantics, and if it's really required to add
> >>>> what you propose? Please correct me if I am wrong.
> >>>>
> >>>> In the end, a stream-table join is a "stream enrichment" (via a table
> >>>> lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
> >>>> table-table join which is a n:1 join).
> >>>>
> >>>> If this assumption is correct, and you have data for which the table
> >>>> side join attribute is in the value, you could actually repartition
> the
> >>>> table data using the join attribute as the PK of the table.
> >>>>
> >>>> If my assumption is incorrect, and you say you want to have a 1:n join
> >>>> (note that I intentionally reversed from n:1 to 1:n), I would rather
> >>>> object, because it seems to violate the idea to "enrich" a stream,
> what
> >>>> means that each input record produced an output record, not multiple?
> >>>>
> >>>> Also note: for a FK table-table join, we use the forgeinKeyExtractor
> to
> >>>> get the join attribute from the left input table (which corresponds to
> >>>> the KStream in your KIP; ie, it's a n:1 join), while you propose to
> use
> >>>> the foreignKeyExtractor to be applied to the KTable (which is the
> right
> >>>> input, and thus it would be a 1:n join).
> >>>>
> >>>> Maybe you can clarify the use case a little bit. For the current KIP
> >>>> description I only see the 1:1 join case, what would mean we might not
> >>>> need such a feature?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 7/24/23 11:36 AM, Igor Fomenko wrote:
> >>>>> Hello developers of the Kafka Streams,
> >>>>>
> >>>>> I would like to start discussion on KIP-955: Add stream-table join on
> >>>>> foreign key
> >>>>> <
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key
> >>>>>
> >>>>> This KIP proposes the new API to join KStrem with KTable based on
> >> foreign
> >>>>> key relation.
> >>>>> Ths KIP was inspired by one of my former projects to integrate RDBMS
> >>>>> databases with data consumers using Change Data Capture and Kafka.
> >>>>> If we had the capability in Kafka Stream to join KStream with KTable
> on
> >>>>> foreign key this would simplify our implementation significantly.
> >>>>>
> >>>>> Looking forward to your feedback and discussion.
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> Igor
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to