Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2024-03-08 Thread Matthias J. Sax

Igor,

I did drop the ball on this discussion. Sorry about this. Too many 
things are happening at the same time.




Also I do not think that emitting multiple records from the stream-table FK
join is a 'weird' behaviour because this is exactly how the standard SQL
behaves and many stream processing tools try to mimic SQL at the DSL layer
- for example Spark Structured Streaming or Hadoop. 


While the KS DSL is also partially inspired by SQL, we never intended to 
mimic SQL. KS always focused on eventing, and a KStream models an 
event-stream. If you have a single event, you can enrich it with 
auxiliary data. But a 1:n stream-table join would mean, that an event is 
"duplicated" what is semantically questionable, because the thing the 
event models, did only happen ones, not twice?


Of course, we also have flatMap() in case a "fat record" that actually 
contains multiple sub-event come in, and we want to split out the 
sub-events. (I'll cycle back to this below.)




IMHO, SQL is not about eventing, and thus provides other operations than 
the KS DSL. If we would want to mimic SQL, we would not have a KStream 
to begin with, but we would model everything as KTable. Additionally, 
our KTables are limited compared to SQL tables, because we always 
require a PK, something SQL does not require. So we would also need to 
extend KTables if we would want to mimic SQL (but we don't intent to). I 
also want to point out, that a KStream is NOT a key-less SQL table, it 
models events, something that is not in the scope of the relational 
model. (Of course, you can create a key-less table and put events into 
it. But SQL and the relation model does not really understand that it's 
events -- it's just rows in a table in the SQL world and it's up the 
application to interpret and treat the event correctly.)


The point of having a KStream is to extend the semantics and scope of 
the DSL, and to not bind ourselves to KTables and SQL semantics. -- This 
is also the reason why a stream-table join is stream-side driven; table 
side updates don't trigger a join but only update the table, because the 
left hand side is stateless. And already emitted results are events, and 
thus immutable and cannot be update any longer. -- A stream-table join 
by it's very nature, is not a SQL join (SQL joins are "symmetric" if you 
wish while a stream-table join is inherently "asymmetic").




For example we could stream all orderItems for completedOrders as
separate messages (result of FK join) and then could count them based on
time window and on orderItemId to update the current inventory of these
items in the warehouse.


Not sure if I understand this example. What is the nature of `orderItem` 
and `completedOrders`? It seems `orderItem` would belongs to exactly one 
order. And `completedOrders` would be "order table" to see if an order 
is completed or not, and thus it's still a regular stream-table join? 
Why would an `orderItem` belong to more than one `order`?



Overall, I still think, I don't fully understand the use-case yet, for 
which you would *not* want to aggregate, but you would want/need more 
than one result from the join? To me, the event does not duplicate (ie, 
happened twice) and in contrast to a flatMap() we don't split out 
sub-event in a join (or do we)?


Also wondering about stream-stream join? The are used to correlate 
events that happen "close to each other" what is a n:m join. Not sure if 
this would help for your use case? I assume you did consider it and 
rules it out, but as I still don't fully understand your use case, I 
might not fully understand why.



You did mention that you don't want to trigger the join when the table 
updates at some point: going back to your "orderItems" and 
"completedOrders" example from above, this actually does not align. If 
you want to keep inventory up-to-date, you actually want to emit all 
orderItems when an order completes so you can re-stock. Thus, orderItems 
would need to be modeled as a table, and you want a table-table join.


This example might actually indicate (not sure if my interpretation is 
correct), that you might actually want a table, but the issue with a 
KTable would be, that the table grows unbounded(*). And for this reason, 
you try to fall back to a KStream. However, to me, that would not be the 
correct approach because you change _semantics_. If that's the case, the 
better KIP would be to add a TTL to KTable -- something that was asked 
for many times, and I think it would be beneficial to add. In the end, 
you might not want KStream semantics but just want to avoid unbounded 
grow of you KTable?


(*) For such an "orderItem" and "orderTable" case, if you would model 
this with SQL tables, you have the same issue. You always get new 
orders, and you need a way to eventually purge completed orders you are 
not interested in any longer?



Putting my lack or use-case understanding aside, I have a technical 
question: You propose to extract the

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-11 Thread Igor Fomenko
Matthias,

I think that I clouded this discussion a bit with the possible 'fat'
message requirement for the one specific use case that I worked on.
Therefore I would like to take a step back and to focus just on the actual
KIP-955 that only proposes to create a stream-table join on foreign key.
This is regardless if any aggregation (like 'fat' message) is required
afterwards  or not.

Also I do not think that emitting multiple records from the stream-table FK
join is a 'weird' behaviour because this is exactly how the standard SQL
behaves and many stream processing tools try to mimic SQL at the DSL layer
- for example Spark Structured Streaming or Hadoop.  The normal behaviour
of SQL is to select records with the join as the recordset first and only
then as the next step to aggregate or otherwise transform results if it is
required. This sequence is much more flexible and efficient compared to
aggregating everything just in case some records will be selected by the
join.
We can imagine many use cases for joining on FK without subsequent
aggregation or with an aggregation on something other than stream message
key. For example we could stream all orderItems for completedOrders as
separate messages (result of FK join) and then could count them based on
time window and on orderItemId to update the current inventory of these
items in the warehouse.

Consequently I think that for Streams DSL to be consistent it should
provide the capability to join on FK first and then to aggregate optionally
with the separate operation only if required.
Creating a 'fat' message is an edge use case and I think it can not be done
in a separate subsequent operation as you rightfully noticed. So maybe for
that one we can have a separate aggregationJoin as you are proposing with
FK for this use case?
I however think that we should not call 'normal' FK join the 'flatMapJoin'
join, because flat map means we are splitting something when in fact we
just join without an aggregation.

To summarize I would compare two potential solutions for joining
stream-table on FK as follows:

   - Solution proposed in KIP-955: Join stream to table on FK without any
   aggregation. Aggregate as the next step if required (this would be a
   separate KIP - like aggregateJoin on FK)
  - Pros:
 - better performance because no aggregation with subsequent split
 will be performed
 - Logical separation of join and aggregate. This KIP only proposes
 to add FK join. Aggregating could be added in a separate KIP.
 - There are could be different types of aggregation or other data
 transformations added to the pipeline after FK join - not
just creating
 "fat" massage
  - Cons:
 - Solution requires new DSL API
  - Alternative solution discussed in email below: Use existing DSL to
   rekey the right table first to the new table with FK as the PK, and
   aggregate the records with the same PK into arrays. Join stream to table on
   PK (common message key)
  - Pros:
 - Use existing DSL, no new DSL is required to be implemented for
 the cases when 'fat' message is required at the end and
scalability is not
 an important consideration.
  - Cons:
 - This solution does not solve the more general problem of
 stream-table on FK join when no aggregation is required or
different type
 of aggregation is required (for example on child table key)
 - Solution will not scale well in a situation when there is a high
 number of updates on the right table. In my example of the
real life use
 case in email below there are several hundreds updates to
each left hand
 table of the join for each event in the right hand stream.
Overall number
 of updates to left hand tables in Kafka is several thousands
per sec at
 peak hours. For the reason above even if "fat" messages are
required the
 performance will be better with aggregateJoin based on
KIP-955 design when
 messages are aggregated per stream event and not per table update.

Disclaimer: I did not test both solutions side by side for performance. For
now I am just using design observations for performance/scalability
projections.

Any additions to pros/cons? Any other solution alternatives?

Regards,

Igor



On Thu, Aug 10, 2023 at 7:58 PM Matthias J. Sax  wrote:

> Thanks. Seems we are on the same page now what the requirement are?
> That's good progress!
>
>
> > This solution was considered when in KIP-213 for the existing
> > table-table FK join. There is a discussion on disadvantages of using this
> > approach in the article related to KIP-213 and I think the same
> > disadvantages will apply to this KIP.
>
> I am not sure. The difference (to me) is, that for KIP-213, if we
> aggregate the right input table, we would need to split the "fat result"
> record to flatten the individual result record we want to have. But for
> your case it seem

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-10 Thread Matthias J. Sax
Thanks. Seems we are on the same page now what the requirement are? 
That's good progress!




This solution was considered when in KIP-213 for the existing
table-table FK join. There is a discussion on disadvantages of using this
approach in the article related to KIP-213 and I think the same
disadvantages will apply to this KIP.


I am not sure. The difference (to me) is, that for KIP-213, if we 
aggregate the right input table, we would need to split the "fat result" 
record to flatten the individual result record we want to have. But for 
your case it seems, you want to have a single "fat" result record in the 
end, so the aggregation is not a workaround but a requirement anyway? If 
we go with KIP-955, your use case requires an aggregation (right?) 
because for each input stream record, you want one result record (not 
multiple?).




I see FK join as the common operation in data manipulation so it would
be nice to have a shortcut for it and not to try to design it from existing
functionality all the time.


Well, yes and no? In the end, a stream-table join is a _enrichment_ 
join, ie, for each left input stream event, we emit one (or none, if it 
does not match for inner join) result record. A stream-FK-table-join 
would imply that we emit multiple result records, what is (at least to 
me) a somewhat weird behavior, because it's kinda "flatMap" as join 
side-effect. (Or we add in an aggregation, and again, have a single 
composed operator with "weird" semantics.) It does not appeal 
semantically clean to me to do it this way.




Consider the real use case I discussed at the
beginning when a business entity has 25 children


Not sure if I fully understand? Are you saying a single stream record 
would join with 25 table rows? And that's why you think you cannot 
aggregate those 25 rows because such a "fat row" would be too large? If 
this is the case, (and I am correct about my understanding that your use 
case needs an aggregation step anyway), than this issue does not go way, 
because you build a single "fat" result record containing all these 25 
rows as final result anyway.




This solution similarly to mine is "mudding the water" by providing a
hybrid outcome join + aggregate. At list with my proposal we could
potentially control it with the flag, or maybe create some special
aggregate that could be chained after (don't know how to do it yet :-))


Why would it mud the waters if you combine multiple operators? If you 
apply an aggregation and a join operator, both operators provide 
well-know and clean semantics? To me, "muddying the waters" means to 
have a single operator that does "too much" at once (and adding a config 
makes it even worse IMHO, as it now actually merged even more things 
into a single operator).


From my POV, a good DSL is a tool set of operators each doing one (well 
defined) thing, and you can combine them to do complex stuff. Building 
"fat uber" operators is the opposite of it IMHO.


I am still on the fence if KIP-955 propose a well-defined operator or 
not, because it seems it's either a flatMap+join or join+aggregation -- 
for both cases, I am wondering why we would want to combine them into a 
single operator?


To me, there are two good argument for adding a new operator:

 (1) It's not possible to combine existing operators to semantically 
express the same at all.


 (2) Adding the operator provides significant performance improvements 
compared to combining a set of existing operators.


Do we think one of both cases apply?


Lets call a stream-fk-joins that emits multiple result records the 
"flatMapJoin" and the stream-fk-join that emit a single "fat" result 
record the "aggregationJoin".


If my understanding is correct, and you need an aggregation anyway, 
adding a flatMapJoin that need an additional aggregation downstream does 
not work anyway, because the aggregation cannot know when to start a new 
aggregation... Assume there is two input event both with orderId1; the 
first joins to two table rows, emitting two flatMapJoin result records, 
and the second joins to three table rows, emitting three flatMapJoin 
record. How would the downstream aggregation know, to put records 1+2 
and record 3+4+5 together to get back to the original two input records?


If flatMapJoin does not work, and we go with aggregationJoin, I would 
still argue that it's the same as doing an upstream table aggregation 
plus a regular stream-table join, and I don't see a big perf difference 
between both operations either. For both cases the table input is 
repartitioned. And we also build a fat-record for both cases. The 
difference is that we store the fat-record in the table for the explicit 
aggregation, but we avoid an expensive range scan... but the record size 
will be there in any case, so I am not sure what we gain by not storing 
the fat record in the table if we cannot get rid of the fat record in 
any case?)





-Matthias



On 8/10/23 12:09 PM, Igor Fomenko wrote:

I d

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-10 Thread Igor Fomenko
I don't mind you being a bit picky. I think it is a great discussion and it
helps me too. For example I clearly see now that the problem of aggregation
still needs to be solved for this use case.
Please see my answers below.

I used an example of OrderEvents to OrderItems relationship as 1:1 just to
demonstrate that even in tht simple case the existing table-table join on
FK will not work. However the use case I have in general may have 1:1, 1:0,
or 1:n relations. One complex business entity I had to deal with called
"transportation waybill" that has 25 child tables. Number of child records
in each child table could be 0:n for each record in the main waybill table.
When an event is generated for a certain waybill then a "complete" waybill
needs to be assembled from the subset of child tables. The subset of child
tables for waybill data assembly depends on the event type (any event type
has waybillId). There is also some additional filtering mapping and
enrichment that needs to be done in a real use case that is not relevant to
what we discuss. As you can see the use case is very complex and this is
why I wanted to distill it to very simple terms that are relevant to this
discussion.

Now I am switching back to the simple example of OrderEvent with OrderItems.
Please note that OrderEvent is a derived message. It is derived by joining
the actual event message that has orderId as its key with the Order message
that also has OrderId as its key. Because joining these two messages is
trivial I did not include this part and stated that we are sharing from
the  Order Event message right away.
So to summarize: We need to join each OrderEvent message (OrderId is key)
with 0 or 1 or many orderItems messages (OrderItem is the key and orderID
is one of the message fields).

Now, let's consider your solution:
1. We do not need to aggregate orderEvents around the key since we need to
provide an output for each orderEvent (each orderEvent needs to be joined
with an aggregate of OrderItems related to this orderEvent). So we can skip
this step.
2. Because OrderItems are multiple distinct records for each OrderId we can
not rekey them with OrderId PK to the table, uness we do some sort of
aggregation for them. So let's say we rekey orderItems with orderId and
aggregate each record field into an array. I think we also need to
co-partition with OrderEvents.
3. Now we can do stream-table join the orderEvents stream with the
OrderItemsAggregated table using the OrderId key that is common for both.

So the conclusion is that your solution will work with some tweaking
(basically aggregating on OrderItems instead of on events).
While this solution will work it has several issues as follows:

   - This solution was considered when in KIP-213 for the existing
   table-table FK join. There is a discussion on disadvantages of using this
   approach in the article related to KIP-213 and I think the same
   disadvantages will apply to this KIP. Please see here:
   
https://www.confluent.io/blog/data-enrichment-with-kafka-streams-foreign-key-joins/#workaround

   - I see FK join as the common operation in data manipulation so it would
   be nice to have a shortcut for it and not to try to design it from existing
   functionality all the time. Consider the real use case I discussed at the
   beginning when a business entity has 25 children
   - This solution similarly to mine is "mudding the water" by providing a
   hybrid outcome join + aggregate. At list with my proposal we could
   potentially control it with the flag, or maybe create some special
   aggregate that could be chained after (don't know how to do it yet :-))

Any thoughts?

Regards,

Igor

On Wed, Aug 9, 2023 at 7:19 PM Matthias J. Sax  wrote:

> Thanks for the details. And sorry for being a little bit picky. My goal
> is to really understand the use-case and the need for this KIP. It's a
> massive change and I just want to ensure we don't add (complex) things
> unnecessarily.
>
>
> So you have a streams of "orderEvents" with key=orderId. You cannot
> represent them as a KTable, because `orderId` is not a PK, but just an
> identify that a message belongs to a certain order. This part I understand.
>
> You also have a KTable "orderItems", with orderId as a value-field.
>
>
>
> >  Relationship between parent and child messages is 1:1
>
> If I understand correctly, you want to join on orderId. If the join is
> 1:1, it means that there is only a single table-record for each unique
> orderId. Thus, orderId could be the PK of the table. If that's correct,
> you could use orderId as the key of "orderItems" and do a regular
> stream-table join. -- Or do I miss something?
>
>
>
> > and to send it only once to the target system as one ‘complete order >
> message for each new ‘order event’ message.
>
> This sound like an aggregation to me, not a join? It seems that an order
> consists of multiple "orderEvent" messages, and you would want to
> aggregate them based on orderId (plus add 

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-09 Thread Matthias J. Sax
Thanks for the details. And sorry for being a little bit picky. My goal 
is to really understand the use-case and the need for this KIP. It's a 
massive change and I just want to ensure we don't add (complex) things 
unnecessarily.



So you have a streams of "orderEvents" with key=orderId. You cannot 
represent them as a KTable, because `orderId` is not a PK, but just an 
identify that a message belongs to a certain order. This part I understand.


You also have a KTable "orderItems", with orderId as a value-field.




 Relationship between parent and child messages is 1:1


If I understand correctly, you want to join on orderId. If the join is 
1:1, it means that there is only a single table-record for each unique 
orderId. Thus, orderId could be the PK of the table. If that's correct, 
you could use orderId as the key of "orderItems" and do a regular 
stream-table join. -- Or do I miss something?





and to send it only once to the target system as one ‘complete order > message 
for each new ‘order event’ message.


This sound like an aggregation to me, not a join? It seems that an order 
consists of multiple "orderEvent" messages, and you would want to 
aggregate them based on orderId (plus add some more order detail 
information from the table)? Only after all "orderEvent" messages are 
received and the order is "completed" you want to send a result 
downstream (that is fine and would be a filter in the DSL to drop 
incomplete results).





Maybe there could be a flag to stream-table foreign key join that would
indicate if we want this join to aggregate children or not?


Wouldn't this mud the waters between a join and an aggregation and imply 
that it's a "weird" hybrid operator, and we would also need to change 
the `join()` method to accept an additional `Aggregator` function?




From what I understand so far (correct me if I am wrong), you could do 
what you want to do as follows:


// accumulate all orderEvents per `orderId`
// cf last step to avoid unbounded growth of the result KTable
KStream orderEventStream = builder.stream("orderEventTopic")
// you might want to disable caching in the next step
KTable orderEvents = orderEventStream.groupByKey().aggregate(...);

// rekey you orderItems to use `orderId` as PK for the table
KStream orderItemStream = builder.stream("orderItemTopic");
KTable orderItems = orderItemStream.map(/*put orderId as key */).toTable();

// do the join
KStream enrichedOrders = orderEvents.toStream().join(orderItems);

// drop incomplete orders
KStreame completedOrderds = enrichedOrders.filter(o -> o.isCompleted());

// publish result
completedOrderds.to("resultTopic");

// additional cleanup
completedOrderds.map(/*craft a special "delete order 
message"*/).to("orderEventTopic");



The last step is required to have a "cleanup" message to purge state 
from the `orderEvents` KTable that was computed via the aggregation. If 
such a cleanup message is processed by the `aggregate` step, you would 
return `null` as aggregation result to drop the record for the 
corresponding orderId that was completed, to avoid unbounded growth of 
the KTable. (There are other ways to do the same cleanup; it's just one 
example how it could be done.)



If I got it wrong, can you explain what part I messed up?



-Matthias




On 8/7/23 10:15 AM, Igor Fomenko wrote:

Hi Matthias,

Hi Matthias,



Thanks for your comments.



I would like to clarify the use case a little more to show why existing
table-table foreign key join will not work for the use case I am trying to
address.

Let’s consider the very simple use case with the parent messages in one
Kafka topic (‘order event’ messages that also contain some key order info)
and the child messages in another topic (‘order items’ messages with an
additional info for the order). Relationship between parent and child
messages is 1:1. Also ‘order items’ message has OrderID as one of its
fields (foreign key).



The requirement is to combine info of the parent ‘order event’ message with
child ‘order items’ message using foreign key and to send it only once to
the target system as one ‘complete order’ message for each new ‘order
event’ message.

Please note that the new messages which are related to order items (create,
update, delete) should not trigger the resulting ‘complete order’ message).



 From the above requirements we can state the following:

1. Order events are unique and never updated or deleted; they can only
be replayed if we need to recover the event stream. For our order example I
would use OrderID as an event key but if we use the KTable to represent
events then events with the same OrderID will overwrite each other. This
may or may not cause some issues but using the stream to model seems to be
a more correct approach from at least performance point of view.

2. We do not want updates from the “order items” table on the right
side of the join to generate an output since only events should be the
trigger for output messages 

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-07 Thread Igor Fomenko
Hi Matthias,

Hi Matthias,



Thanks for your comments.



I would like to clarify the use case a little more to show why existing
table-table foreign key join will not work for the use case I am trying to
address.

Let’s consider the very simple use case with the parent messages in one
Kafka topic (‘order event’ messages that also contain some key order info)
and the child messages in another topic (‘order items’ messages with an
additional info for the order). Relationship between parent and child
messages is 1:1. Also ‘order items’ message has OrderID as one of its
fields (foreign key).



The requirement is to combine info of the parent ‘order event’ message with
child ‘order items’ message using foreign key and to send it only once to
the target system as one ‘complete order’ message for each new ‘order
event’ message.

Please note that the new messages which are related to order items (create,
update, delete) should not trigger the resulting ‘complete order’ message).



>From the above requirements we can state the following:

1. Order events are unique and never updated or deleted; they can only
be replayed if we need to recover the event stream. For our order example I
would use OrderID as an event key but if we use the KTable to represent
events then events with the same OrderID will overwrite each other. This
may or may not cause some issues but using the stream to model seems to be
a more correct approach from at least performance point of view.

2. We do not want updates from the “order items” table on the right
side of the join to generate an output since only events should be the
trigger for output messages in our scenario. This is aligned with the
stream-table join behavior rather than table-table join when updates are
coming from both sides

3. Stream-table join will give us resulting stream which is more align
with our output requirements than the table that would be result of
table-table join



Requirement #2 above is the most important one and it can not be achieved
with existing table-table join on foreign key.



I also stated that the foreign key table in table-table join is on the
‘wrong’ side for our order management use case. By this I just meant that
in stream-table join I am proposing the foreign key table needs to be on
the right side and on the existing table-table join it is on the left. This
is however is irrelevant since we can not use table-table join anyway for
the reason #2 above.



You made a good point about aggregation of child messages for a more
complex use case of 1:n relation between parent and children. Initially I
was thinking that aggregation will be just a separate operation that could
be added after we performed a foreign key join. Now I realize that it will
not be possible to do it after.

Maybe there could be a flag to stream-table foreign key join that would
indicate if we want this join to aggregate children or not?



What do you think?

Regards,



Igor


On Fri, Aug 4, 2023 at 10:01 PM Matthias J. Sax  wrote:

> Thanks a lot for providing more background. It's getting much clear to
> me now.
>
> Couple of follow up questions:
>
>
> > It is not possible to use table-table join in this case because
> triggering
> > events are supplied separately from the actual data entity that needs to
> be
> > "assembled" and these events could only be presented as KStream due to
> > their nature.
>
> Not sure if I understand this part? Why can't those events not
> represented as a KTable. You say "could only be presented as KStream due
> to their nature" -- what do you mean by this?
>
> In the end, my understanding is the following (using the example for the
> KIP):
>
> For the shipments <-> orders and order-details <-> orders join, shipment
> and order-details are the fact table, what is "reverse" to what you
> want? Using existing FK join, it would mean you get two enriched tables,
> that you cannot join to each other any further (because we don't support
> n:m join): in the end, shipmentId+orderDetailId would be the PK of such
> a n:m join?
>
> If that's correct, (just for the purpose to make sure I understand
> correctly), if we would add an n:m join, you could join shipment <->
> order-details first, and use a FK join to enrich the result with orders.
> -- In addition, you could also do a FK join to event if you represent
> events as a table (this relates to my question from above, why events
> cannot be represented as a KTable).
>
>
> A the KIP itself, I am still wondering about details: if we get an event
> in, and we do a lookup into the "FK table" and find multiple matches,
> would we emit multiple results? This would kinda defeat the purpose to
> re-assemble everything into a single entity? (And it might require an
> additional aggregation downstream to put the entity together.) -- Or
> would we join the singe event, with all found table rows, and emit a
> single "enriched" event?
>
>
> Thus, I am actually wondering, if you would not pre-proces

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-04 Thread Matthias J. Sax
Thanks a lot for providing more background. It's getting much clear to 
me now.


Couple of follow up questions:



It is not possible to use table-table join in this case because triggering
events are supplied separately from the actual data entity that needs to be
"assembled" and these events could only be presented as KStream due to
their nature.


Not sure if I understand this part? Why can't those events not 
represented as a KTable. You say "could only be presented as KStream due 
to their nature" -- what do you mean by this?


In the end, my understanding is the following (using the example for the 
KIP):


For the shipments <-> orders and order-details <-> orders join, shipment 
and order-details are the fact table, what is "reverse" to what you 
want? Using existing FK join, it would mean you get two enriched tables, 
that you cannot join to each other any further (because we don't support 
n:m join): in the end, shipmentId+orderDetailId would be the PK of such 
a n:m join?


If that's correct, (just for the purpose to make sure I understand 
correctly), if we would add an n:m join, you could join shipment <-> 
order-details first, and use a FK join to enrich the result with orders. 
-- In addition, you could also do a FK join to event if you represent 
events as a table (this relates to my question from above, why events 
cannot be represented as a KTable).



A the KIP itself, I am still wondering about details: if we get an event 
in, and we do a lookup into the "FK table" and find multiple matches, 
would we emit multiple results? This would kinda defeat the purpose to 
re-assemble everything into a single entity? (And it might require an 
additional aggregation downstream to put the entity together.) -- Or 
would we join the singe event, with all found table rows, and emit a 
single "enriched" event?



Thus, I am actually wondering, if you would not pre-process both 
shipment and order-details table, via `groupBy(orderId)` and assemble a 
list (or similar) of alls shipments (or order-details) per order? If you 
do this pre-processing, you can do a PK-PK (1:1) join with the orders 
table, and also do a stream-table join to enrich your events will the 
full order information?




-Matthias

On 7/26/23 7:13 AM, Igor Fomenko wrote:

Hello Matthias,

Thank you for this response. It provides the context for a good discussion
related to the need for this new interface.

The use case I have in mind is not really a stream enrichment which usually
implies that the event has a primary key to some external info and that
external info could be just looked up in some other data source.

The pattern this KIP proposes is more akin to the data entity assembly
pattern from the persistence layer so it is not purely integration pattern
but rather a pattern that enables an event stream from persistence layer of
a data source application. The main driver here is the ability to stream a
data entity of any complexity (complexity in terms of the relational model)
from an application database to some data consumers. The technical
precondition here is of course that data is already extracted from the
relational database with something like Change Data Capture (CDC) and
placed to Kafka topics. Also due to CDC limitations, each database table
that is related to the entity relational data model is extracted to the
separate Kafka topic.

So to answer you first question the entity that needs to be "assembled"
from Kafka topics in the very common use case has 1:n relations where 1
corresponds to the triggering event enriched with the data from the main
(or parent) table of the data entity (for example completion of the
purchase order event + order data from the order table) and n corresponds
to the many children that needs to be joined with the order table to have
the full data entity (for example multiple line items of the purchase order
needs to be added from the line items child table).

It is not possible to use table-table join in this case because triggering
events are supplied separately from the actual data entity that needs to be
"assembled" and these events could only be presented as KStream due to
their nature. Also currently the FK table in table-table join is on the
"wrong" side of the join.
It is possible to use existing stream-table join only to get data from the
parent entity table (order table) because the event to order is 1:1. After
that it is required to add "children" tables of the order to complete
entity assembly, these childered are related as 1:n with foreign key fields
in each child table (which is order ID).

This use case is typically implemented with some sort of ESB (like
Mulesoft) where ESB receives an event and then uses JDBC adapter to issue
SQL query with left join on foreign key for child tables. ESB then loops
through the returned record set to assemble the full data entity. However
in many cases for various architecture reasons there is a desire to remove
JDBC queries from the data sourc

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-26 Thread Igor Fomenko
Hello Matthias,

Thank you for this response. It provides the context for a good discussion
related to the need for this new interface.

The use case I have in mind is not really a stream enrichment which usually
implies that the event has a primary key to some external info and that
external info could be just looked up in some other data source.

The pattern this KIP proposes is more akin to the data entity assembly
pattern from the persistence layer so it is not purely integration pattern
but rather a pattern that enables an event stream from persistence layer of
a data source application. The main driver here is the ability to stream a
data entity of any complexity (complexity in terms of the relational model)
from an application database to some data consumers. The technical
precondition here is of course that data is already extracted from the
relational database with something like Change Data Capture (CDC) and
placed to Kafka topics. Also due to CDC limitations, each database table
that is related to the entity relational data model is extracted to the
separate Kafka topic.

So to answer you first question the entity that needs to be "assembled"
from Kafka topics in the very common use case has 1:n relations where 1
corresponds to the triggering event enriched with the data from the main
(or parent) table of the data entity (for example completion of the
purchase order event + order data from the order table) and n corresponds
to the many children that needs to be joined with the order table to have
the full data entity (for example multiple line items of the purchase order
needs to be added from the line items child table).

It is not possible to use table-table join in this case because triggering
events are supplied separately from the actual data entity that needs to be
"assembled" and these events could only be presented as KStream due to
their nature. Also currently the FK table in table-table join is on the
"wrong" side of the join.
It is possible to use existing stream-table join only to get data from the
parent entity table (order table) because the event to order is 1:1. After
that it is required to add "children" tables of the order to complete
entity assembly, these childered are related as 1:n with foreign key fields
in each child table (which is order ID).

This use case is typically implemented with some sort of ESB (like
Mulesoft) where ESB receives an event and then uses JDBC adapter to issue
SQL query with left join on foreign key for child tables. ESB then loops
through the returned record set to assemble the full data entity. However
in many cases for various architecture reasons there is a desire to remove
JDBC queries from the data source and replace it with CDC streaming data to
Kafka. So in that case assembling data entities from Kafka topics instead
of JDBC would be beneficial.

Please let me know what you think.

Regards,

Igor

On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax  wrote:

> Igor,
>
> thanks for the KIP. Interesting proposal. I am wondering a little bit
> about the use-case and semantics, and if it's really required to add
> what you propose? Please correct me if I am wrong.
>
> In the end, a stream-table join is a "stream enrichment" (via a table
> lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
> table-table join which is a n:1 join).
>
> If this assumption is correct, and you have data for which the table
> side join attribute is in the value, you could actually repartition the
> table data using the join attribute as the PK of the table.
>
> If my assumption is incorrect, and you say you want to have a 1:n join
> (note that I intentionally reversed from n:1 to 1:n), I would rather
> object, because it seems to violate the idea to "enrich" a stream, what
> means that each input record produced an output record, not multiple?
>
> Also note: for a FK table-table join, we use the forgeinKeyExtractor to
> get the join attribute from the left input table (which corresponds to
> the KStream in your KIP; ie, it's a n:1 join), while you propose to use
> the foreignKeyExtractor to be applied to the KTable (which is the right
> input, and thus it would be a 1:n join).
>
> Maybe you can clarify the use case a little bit. For the current KIP
> description I only see the 1:1 join case, what would mean we might not
> need such a feature?
>
>
> -Matthias
>
>
> On 7/24/23 11:36 AM, Igor Fomenko wrote:
> > Hello developers of the Kafka Streams,
> >
> > I would like to start discussion on KIP-955: Add stream-table join on
> > foreign key
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key
> >
> > This KIP proposes the new API to join KStrem with KTable based on foreign
> > key relation.
> > Ths KIP was inspired by one of my former projects to integrate RDBMS
> > databases with data consumers using Change Data Capture and Kafka.
> > If we had the capability in Kafka Stream to join KStream with KTable on
> > f

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-25 Thread Matthias J. Sax

Igor,

thanks for the KIP. Interesting proposal. I am wondering a little bit 
about the use-case and semantics, and if it's really required to add 
what you propose? Please correct me if I am wrong.


In the end, a stream-table join is a "stream enrichment" (via a table 
lookup). Thus, it's inherently a 1:1 join (in contrast to a FK 
table-table join which is a n:1 join).


If this assumption is correct, and you have data for which the table 
side join attribute is in the value, you could actually repartition the 
table data using the join attribute as the PK of the table.


If my assumption is incorrect, and you say you want to have a 1:n join 
(note that I intentionally reversed from n:1 to 1:n), I would rather 
object, because it seems to violate the idea to "enrich" a stream, what 
means that each input record produced an output record, not multiple?


Also note: for a FK table-table join, we use the forgeinKeyExtractor to 
get the join attribute from the left input table (which corresponds to 
the KStream in your KIP; ie, it's a n:1 join), while you propose to use 
the foreignKeyExtractor to be applied to the KTable (which is the right 
input, and thus it would be a 1:n join).


Maybe you can clarify the use case a little bit. For the current KIP 
description I only see the 1:1 join case, what would mean we might not 
need such a feature?



-Matthias


On 7/24/23 11:36 AM, Igor Fomenko wrote:

Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key

This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor