Re: [DISCUSSION] KIP-965: Support disaster recovery between clusters by MirrorMaker

2023-08-14 Thread Igor Fomenko
I agree that synchronizing ACL between clusters is a very useful feature
for DR scenarios.
In fact, I would prefer this to be a default setting since almost every
prod implementation requires a DR cluster.

There are some scenarios when replication between clusters is done for some
other reasons (like cluster migration for example) but this in my opinion
is less common than DR scenario.

Igor


On Tue, Aug 8, 2023 at 9:33 AM Ryanne Dolan  wrote:

> hudeqi, I'd call the configuration property something that describes what
> it does rather than it's intended use-case.
>
> Ryanne
>
> On Tue, Aug 8, 2023, 4:46 AM hudeqi <16120...@bjtu.edu.cn> wrote:
>
> > Hi, all. I want to submit a kip, and hope get some review and good
> > suggestions. the kip is here:
> https://cwiki.apache.org/confluence/x/k5KzDw
> >
> > Motivation:
> >
> >
> > When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ.
> > The rationale to is prevent other clients to produce to remote topics,
> > which is mentioned in KIP-382: MirrorMaker 2.0.
> >
> > However in disaster recovery scenarios, where the target cluster is not
> > used and just a "hot standby", it would be preferable to have exactly the
> > same ACLs on both clusters to speed up failover. Therefore, in this
> > scenario, we need to synchronize the topic write ACL, group ACL, and
> > absolute user scram credential of the source cluster topic to the target
> > cluster, so that when the user directly switches the read and write
> service
> > to the target cluster, it can be ran directly.
> >
> > Proposed changes:
> >
> > Add a config parameter: disaster.recovery.enabled in MirrorMakerConfig,
> > the default is false, it will leave the current sync behavior unchanged,
> if
> > set true, it will synchronize the topic write ACL, group ACL, and
> > absolute user scram credential of the source cluster replicated topics to
> > the target cluster.
> >
> > topic write ACL: Filter all topic read Acl informations
> related
> > to the topics replicated with the source cluster.
> > user scram credential: Filter the user scram credential to be
> synchronized
> > according to the topic acl information to be synchronized and create user
> > in target cluster.
> > group ACL: The group Acl information is obtained by filtering the user
> > obtained above.
> >
> > Looking forward to your reply.
> >
> > Best, hudeqi
>


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-11 Thread Igor Fomenko
d to flatten the individual result record we want to have. But for
> your case it seems, you want to have a single "fat" result record in the
> end, so the aggregation is not a workaround but a requirement anyway? If
> we go with KIP-955, your use case requires an aggregation (right?)
> because for each input stream record, you want one result record (not
> multiple?).
>
>
> > I see FK join as the common operation in data manipulation so it would
> > be nice to have a shortcut for it and not to try to design it from
> existing
> > functionality all the time.
>
> Well, yes and no? In the end, a stream-table join is a _enrichment_
> join, ie, for each left input stream event, we emit one (or none, if it
> does not match for inner join) result record. A stream-FK-table-join
> would imply that we emit multiple result records, what is (at least to
> me) a somewhat weird behavior, because it's kinda "flatMap" as join
> side-effect. (Or we add in an aggregation, and again, have a single
> composed operator with "weird" semantics.) It does not appeal
> semantically clean to me to do it this way.
>
>
> > Consider the real use case I discussed at the
> > beginning when a business entity has 25 children
>
> Not sure if I fully understand? Are you saying a single stream record
> would join with 25 table rows? And that's why you think you cannot
> aggregate those 25 rows because such a "fat row" would be too large? If
> this is the case, (and I am correct about my understanding that your use
> case needs an aggregation step anyway), than this issue does not go way,
> because you build a single "fat" result record containing all these 25
> rows as final result anyway.
>
>
> > This solution similarly to mine is "mudding the water" by providing a
> > hybrid outcome join + aggregate. At list with my proposal we could
> > potentially control it with the flag, or maybe create some special
> > aggregate that could be chained after (don't know how to do it yet :-))
>
> Why would it mud the waters if you combine multiple operators? If you
> apply an aggregation and a join operator, both operators provide
> well-know and clean semantics? To me, "muddying the waters" means to
> have a single operator that does "too much" at once (and adding a config
> makes it even worse IMHO, as it now actually merged even more things
> into a single operator).
>
>  From my POV, a good DSL is a tool set of operators each doing one (well
> defined) thing, and you can combine them to do complex stuff. Building
> "fat uber" operators is the opposite of it IMHO.
>
> I am still on the fence if KIP-955 propose a well-defined operator or
> not, because it seems it's either a flatMap+join or join+aggregation --
> for both cases, I am wondering why we would want to combine them into a
> single operator?
>
> To me, there are two good argument for adding a new operator:
>
>   (1) It's not possible to combine existing operators to semantically
> express the same at all.
>
>   (2) Adding the operator provides significant performance improvements
> compared to combining a set of existing operators.
>
> Do we think one of both cases apply?
>
>
> Lets call a stream-fk-joins that emits multiple result records the
> "flatMapJoin" and the stream-fk-join that emit a single "fat" result
> record the "aggregationJoin".
>
> If my understanding is correct, and you need an aggregation anyway,
> adding a flatMapJoin that need an additional aggregation downstream does
> not work anyway, because the aggregation cannot know when to start a new
> aggregation... Assume there is two input event both with orderId1; the
> first joins to two table rows, emitting two flatMapJoin result records,
> and the second joins to three table rows, emitting three flatMapJoin
> record. How would the downstream aggregation know, to put records 1+2
> and record 3+4+5 together to get back to the original two input records?
>
> If flatMapJoin does not work, and we go with aggregationJoin, I would
> still argue that it's the same as doing an upstream table aggregation
> plus a regular stream-table join, and I don't see a big perf difference
> between both operations either. For both cases the table input is
> repartitioned. And we also build a fat-record for both cases. The
> difference is that we store the fat-record in the table for the explicit
> aggregation, but we avoid an expensive range scan... but the record size
> will be there in any case, so I am not sure what we gain by not storing
> the fat record in the table if we cannot get rid of the fat record in
> any case?)
>
>
>
>
&g

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-10 Thread Igor Fomenko
to me, not a join? It seems that an order
> consists of multiple "orderEvent" messages, and you would want to
> aggregate them based on orderId (plus add some more order detail
> information from the table)? Only after all "orderEvent" messages are
> received and the order is "completed" you want to send a result
> downstream (that is fine and would be a filter in the DSL to drop
> incomplete results).
>
>
>
> > Maybe there could be a flag to stream-table foreign key join that would
> > indicate if we want this join to aggregate children or not?
>
> Wouldn't this mud the waters between a join and an aggregation and imply
> that it's a "weird" hybrid operator, and we would also need to change
> the `join()` method to accept an additional `Aggregator` function?
>
>
>
>  From what I understand so far (correct me if I am wrong), you could do
> what you want to do as follows:
>
> // accumulate all orderEvents per `orderId`
> // cf last step to avoid unbounded growth of the result KTable
> KStream orderEventStream = builder.stream("orderEventTopic")
> // you might want to disable caching in the next step
> KTable orderEvents = orderEventStream.groupByKey().aggregate(...);
>
> // rekey you orderItems to use `orderId` as PK for the table
> KStream orderItemStream = builder.stream("orderItemTopic");
> KTable orderItems = orderItemStream.map(/*put orderId as key */).toTable();
>
> // do the join
> KStream enrichedOrders = orderEvents.toStream().join(orderItems);
>
> // drop incomplete orders
> KStreame completedOrderds = enrichedOrders.filter(o -> o.isCompleted());
>
> // publish result
> completedOrderds.to("resultTopic");
>
> // additional cleanup
> completedOrderds.map(/*craft a special "delete order
> message"*/).to("orderEventTopic");
>
>
> The last step is required to have a "cleanup" message to purge state
> from the `orderEvents` KTable that was computed via the aggregation. If
> such a cleanup message is processed by the `aggregate` step, you would
> return `null` as aggregation result to drop the record for the
> corresponding orderId that was completed, to avoid unbounded growth of
> the KTable. (There are other ways to do the same cleanup; it's just one
> example how it could be done.)
>
>
> If I got it wrong, can you explain what part I messed up?
>
>
>
> -Matthias
>
>
>
>
> On 8/7/23 10:15 AM, Igor Fomenko wrote:
> > Hi Matthias,
> >
> > Hi Matthias,
> >
> >
> >
> > Thanks for your comments.
> >
> >
> >
> > I would like to clarify the use case a little more to show why existing
> > table-table foreign key join will not work for the use case I am trying
> to
> > address.
> >
> > Let’s consider the very simple use case with the parent messages in one
> > Kafka topic (‘order event’ messages that also contain some key order
> info)
> > and the child messages in another topic (‘order items’ messages with an
> > additional info for the order). Relationship between parent and child
> > messages is 1:1. Also ‘order items’ message has OrderID as one of its
> > fields (foreign key).
> >
> >
> >
> > The requirement is to combine info of the parent ‘order event’ message
> with
> > child ‘order items’ message using foreign key and to send it only once to
> > the target system as one ‘complete order’ message for each new ‘order
> > event’ message.
> >
> > Please note that the new messages which are related to order items
> (create,
> > update, delete) should not trigger the resulting ‘complete order’
> message).
> >
> >
> >
> >  From the above requirements we can state the following:
> >
> > 1. Order events are unique and never updated or deleted; they can
> only
> > be replayed if we need to recover the event stream. For our order
> example I
> > would use OrderID as an event key but if we use the KTable to represent
> > events then events with the same OrderID will overwrite each other. This
> > may or may not cause some issues but using the stream to model seems to
> be
> > a more correct approach from at least performance point of view.
> >
> > 2. We do not want updates from the “order items” table on the right
> > side of the join to generate an output since only events should be the
> > trigger for output messages in our scenario. This is aligned with the
> > stream-table join behavior rather than table-table join when updates are
> > coming from both sides
> >
> > 3. Stream-table join will 

Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-07 Thread Igor Fomenko
am to put the entity together.) -- Or
> would we join the singe event, with all found table rows, and emit a
> single "enriched" event?
>
>
> Thus, I am actually wondering, if you would not pre-process both
> shipment and order-details table, via `groupBy(orderId)` and assemble a
> list (or similar) of alls shipments (or order-details) per order? If you
> do this pre-processing, you can do a PK-PK (1:1) join with the orders
> table, and also do a stream-table join to enrich your events will the
> full order information?
>
>
>
> -Matthias
>
> On 7/26/23 7:13 AM, Igor Fomenko wrote:
> > Hello Matthias,
> >
> > Thank you for this response. It provides the context for a good
> discussion
> > related to the need for this new interface.
> >
> > The use case I have in mind is not really a stream enrichment which
> usually
> > implies that the event has a primary key to some external info and that
> > external info could be just looked up in some other data source.
> >
> > The pattern this KIP proposes is more akin to the data entity assembly
> > pattern from the persistence layer so it is not purely integration
> pattern
> > but rather a pattern that enables an event stream from persistence layer
> of
> > a data source application. The main driver here is the ability to stream
> a
> > data entity of any complexity (complexity in terms of the relational
> model)
> > from an application database to some data consumers. The technical
> > precondition here is of course that data is already extracted from the
> > relational database with something like Change Data Capture (CDC) and
> > placed to Kafka topics. Also due to CDC limitations, each database table
> > that is related to the entity relational data model is extracted to the
> > separate Kafka topic.
> >
> > So to answer you first question the entity that needs to be "assembled"
> > from Kafka topics in the very common use case has 1:n relations where 1
> > corresponds to the triggering event enriched with the data from the main
> > (or parent) table of the data entity (for example completion of the
> > purchase order event + order data from the order table) and n corresponds
> > to the many children that needs to be joined with the order table to have
> > the full data entity (for example multiple line items of the purchase
> order
> > needs to be added from the line items child table).
> >
> > It is not possible to use table-table join in this case because
> triggering
> > events are supplied separately from the actual data entity that needs to
> be
> > "assembled" and these events could only be presented as KStream due to
> > their nature. Also currently the FK table in table-table join is on the
> > "wrong" side of the join.
> > It is possible to use existing stream-table join only to get data from
> the
> > parent entity table (order table) because the event to order is 1:1.
> After
> > that it is required to add "children" tables of the order to complete
> > entity assembly, these childered are related as 1:n with foreign key
> fields
> > in each child table (which is order ID).
> >
> > This use case is typically implemented with some sort of ESB (like
> > Mulesoft) where ESB receives an event and then uses JDBC adapter to issue
> > SQL query with left join on foreign key for child tables. ESB then loops
> > through the returned record set to assemble the full data entity. However
> > in many cases for various architecture reasons there is a desire to
> remove
> > JDBC queries from the data source and replace it with CDC streaming data
> to
> > Kafka. So in that case assembling data entities from Kafka topics instead
> > of JDBC would be beneficial.
> >
> > Please let me know what you think.
> >
> > Regards,
> >
> > Igor
> >
> > On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax 
> wrote:
> >
> >> Igor,
> >>
> >> thanks for the KIP. Interesting proposal. I am wondering a little bit
> >> about the use-case and semantics, and if it's really required to add
> >> what you propose? Please correct me if I am wrong.
> >>
> >> In the end, a stream-table join is a "stream enrichment" (via a table
> >> lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
> >> table-table join which is a n:1 join).
> >>
> >> If this assumption is correct, and you have data for which the table
> >> side join attribute is in the value, you could actually repartition the
> &

[VOTE] KIP-955: Add stream-table join on foreign key

2023-08-02 Thread Igor Fomenko
Hi All,

I would like to call a Vote on KIP-955: Add stream-table join on foreign
key.
KIP link is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key

Mostly I would like to get consensus on the need for this new API to Kafka
Streams. The actual design of course will still be open to discussions
until this API is implemented.

Thank you,

Igor


[jira] [Created] (KAFKA-15299) Support left stream-table join on foreign key

2023-08-02 Thread Igor Fomenko (Jira)
Igor Fomenko created KAFKA-15299:


 Summary: Support left stream-table join on foreign key
 Key: KAFKA-15299
 URL: https://issues.apache.org/jira/browse/KAFKA-15299
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Igor Fomenko
Assignee: Igor Fomenko


KIP-955: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key

Currently in Kafka Streams DSL, KStream to KTable joins could only be performed 
with the keys. However in practice it is often required to join the messages in 
Kafka topics using message field as a "foreign key" with the following pattern: 
 

 
streamX.leftJoin(tableY, RecordTableY::getForegnKey, 
joiner).to("output-topic-name")
 
The left loin on foreign key operation will result in a stream of messages from 
two topics joined on foreign key where each output message is produced for each 
event on the input stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-26 Thread Igor Fomenko
Hello Matthias,

Thank you for this response. It provides the context for a good discussion
related to the need for this new interface.

The use case I have in mind is not really a stream enrichment which usually
implies that the event has a primary key to some external info and that
external info could be just looked up in some other data source.

The pattern this KIP proposes is more akin to the data entity assembly
pattern from the persistence layer so it is not purely integration pattern
but rather a pattern that enables an event stream from persistence layer of
a data source application. The main driver here is the ability to stream a
data entity of any complexity (complexity in terms of the relational model)
from an application database to some data consumers. The technical
precondition here is of course that data is already extracted from the
relational database with something like Change Data Capture (CDC) and
placed to Kafka topics. Also due to CDC limitations, each database table
that is related to the entity relational data model is extracted to the
separate Kafka topic.

So to answer you first question the entity that needs to be "assembled"
from Kafka topics in the very common use case has 1:n relations where 1
corresponds to the triggering event enriched with the data from the main
(or parent) table of the data entity (for example completion of the
purchase order event + order data from the order table) and n corresponds
to the many children that needs to be joined with the order table to have
the full data entity (for example multiple line items of the purchase order
needs to be added from the line items child table).

It is not possible to use table-table join in this case because triggering
events are supplied separately from the actual data entity that needs to be
"assembled" and these events could only be presented as KStream due to
their nature. Also currently the FK table in table-table join is on the
"wrong" side of the join.
It is possible to use existing stream-table join only to get data from the
parent entity table (order table) because the event to order is 1:1. After
that it is required to add "children" tables of the order to complete
entity assembly, these childered are related as 1:n with foreign key fields
in each child table (which is order ID).

This use case is typically implemented with some sort of ESB (like
Mulesoft) where ESB receives an event and then uses JDBC adapter to issue
SQL query with left join on foreign key for child tables. ESB then loops
through the returned record set to assemble the full data entity. However
in many cases for various architecture reasons there is a desire to remove
JDBC queries from the data source and replace it with CDC streaming data to
Kafka. So in that case assembling data entities from Kafka topics instead
of JDBC would be beneficial.

Please let me know what you think.

Regards,

Igor

On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax  wrote:

> Igor,
>
> thanks for the KIP. Interesting proposal. I am wondering a little bit
> about the use-case and semantics, and if it's really required to add
> what you propose? Please correct me if I am wrong.
>
> In the end, a stream-table join is a "stream enrichment" (via a table
> lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
> table-table join which is a n:1 join).
>
> If this assumption is correct, and you have data for which the table
> side join attribute is in the value, you could actually repartition the
> table data using the join attribute as the PK of the table.
>
> If my assumption is incorrect, and you say you want to have a 1:n join
> (note that I intentionally reversed from n:1 to 1:n), I would rather
> object, because it seems to violate the idea to "enrich" a stream, what
> means that each input record produced an output record, not multiple?
>
> Also note: for a FK table-table join, we use the forgeinKeyExtractor to
> get the join attribute from the left input table (which corresponds to
> the KStream in your KIP; ie, it's a n:1 join), while you propose to use
> the foreignKeyExtractor to be applied to the KTable (which is the right
> input, and thus it would be a 1:n join).
>
> Maybe you can clarify the use case a little bit. For the current KIP
> description I only see the 1:1 join case, what would mean we might not
> need such a feature?
>
>
> -Matthias
>
>
> On 7/24/23 11:36 AM, Igor Fomenko wrote:
> > Hello developers of the Kafka Streams,
> >
> > I would like to start discussion on KIP-955: Add stream-table join on
> > foreign key
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key
> >
> > This KIP proposes the new API to join KStrem with KTable based on foreign
> > key relation.

[DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-24 Thread Igor Fomenko
Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key

This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor


Request permission to contribute

2023-07-18 Thread Igor Fomenko
Hello,

I would like to get permission to contribute to the Apache Kafka project.
Specifically I would like to create KIP for Kafka Streams and to work on
the implementation for that KIP.

My JIRA and Confluence ID is the same and it is: igorf211

Regards,
Igor Fomenko