Re: Re-key by multiple properties without composite key

2024-02-07 Thread Karsten Stöckmann
Matthias, thank you for getting back on this. - I'll just let the
application run for now. We are trying to build a CDC system
leveraging Kafka in order to feed aggregated data into an indexing
solution like Elasticsearch or Opensearch. Maybe after all the Kafka
Streams application will be sufficient, but: it has to catch up
millions of records of existing data (our software has been running
for about 10 years now, piling up around 80M customers). Once that's
finished, I guess performance will be alright as it is not a
high-throughput system.

Anyway, thanks so much for your help.

Best wishes
Karsten

Am Mi., 7. Feb. 2024 um 20:05 Uhr schrieb Matthias J. Sax :
>
> Using the DSL, this sounds about right.
>
> I am not worried about the complexity -- KS can handle it, and it's not
> uncommon to end up with such topologies.
>
> You might be able to cut down on complexity by not using the DSL, but
> the Processor API. It gives you more control, and thus you might be able
> to optimize the overall topology.
>
> Maybe inspect the details of `TopologyDescription` to spot
> inefficiencies of the DSL generated Topology that might give you an idea
> how much you could optimize using Processor API (to estimate if it would
> be worth the effort).
>
> It's hard to tell w/o knowing the details. It could also be just an
> inherently complex problem, and the DSL program is already as efficient
> as it gets...
>
> Of course, there might also be ways to play with configs to cut down on
> latency to some extend, if e2e latency is your main concern. Again, I
> don't know the use case: for many case, sub-second latency is actually
> sufficient.
>
> HTH.
>
> -Matthias
>
> On 2/7/24 7:41 AM, Karsten Stöckmann wrote:
> > Sorry for being late with the response - I've been quite busy working
> > on our Streams application lately.
> >
> > That leads me back to my initial question. The Folder class contains
> > multiple fields with FK pointing to the Person table, all of them with
> > different semantics (customer, billing address, etc). So in order to
> > find _all_ folders related to a particular person regardless of its
> > role, I guess I need to
> >
> > a) re-key the folder table on each person FK independently and then
> > b) outer join the result tables.
> >
> > The entire topology is insanely complex, I've got around 10 tables
> > with different levels of nesting (e.g. folder -- 1:n --> dependency a
> > -- 1:n --> dependency b) that all need to be aggregated and in the end
> > re-keyed to person IDs in order to build an aggregate person. There
> > are 14 sub topologies... - measuring the e2e latency shows values
> > around 600ms which seems rather high to me. Does that sound crazy? ;)
> >
> > Best wishes
> > Karsten
> >
> > Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax 
> > :
> >>
> >> I see. You need to ensure that you get _all_ Person.
> >>
> >> For this case, I guess you are right. You would need to first aggregate
> >> the folder per person:
> >>
> >> KTable allPersonFolders =
> >>   folder.groupBy((...) -> (folder.customerId, ...))
> >> .aggregate(...)
> >>
> >> And in a second step, do a left join:
> >>
> >> result = personTable.leftJoin(allPersonFolders,...)
> >>
> >>
> >>> Reading the topic as a table directly did not work out as that crashed
> >>> the application; apparently reading the topic as a KTable and then
> >>> using that for three independent re-key-operations is not allowed.
> >>
> >> Not sure if I can follow. What do you mean by "crashed". -- For tables,
> >> there is no `selectKey()` nor  a `repartition()` as explained in my
> >> previous reply. However, doing a `table.groupBy(...)` will set a new key
> >> and repartition the data to your needs.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/1/24 1:12 AM, Karsten Stöckmann wrote:
> >>> Thanks so much for taking a look. An FK-table-table join is an inner
> >>> join which implies there would be no Person entites without associated
> >>> Folders. Unfortunately, that's not the case. That lead me to an
> >>> attempt of re-keying the Folder topic by each of the three possible
> >>> foreign keys in order to be able to left join Persons against each of
> >>> the three re-keyed KTables to build an eventual Person aggregation
> >>> containing all possible Folders associated in any way.
> >>>
> >>> Reading the topic as a table directly did not work out as that crashed
> >>> the application; apparently reading the topic as a KTable and then
> >>> using that for three independent re-key-operations is not allowed.
> >>>
> >>> Best wishes,
> >>> Karsten
> >>>
> >>> Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax 
> >>> :
> 
>  Thanks for the details. This does make sense.
> 
>  So it seems you can read all topic as table (ie, builder.table("topic")
>  -- no need to so `builder.stream().toTable()`).
> 
>  And you can use the built-in FK-table-table join, and aggregate the 
>  result:
> 
>  

Re: Re-key by multiple properties without composite key

2024-02-07 Thread Matthias J. Sax

Using the DSL, this sounds about right.

I am not worried about the complexity -- KS can handle it, and it's not 
uncommon to end up with such topologies.


You might be able to cut down on complexity by not using the DSL, but 
the Processor API. It gives you more control, and thus you might be able 
to optimize the overall topology.


Maybe inspect the details of `TopologyDescription` to spot 
inefficiencies of the DSL generated Topology that might give you an idea 
how much you could optimize using Processor API (to estimate if it would 
be worth the effort).


It's hard to tell w/o knowing the details. It could also be just an 
inherently complex problem, and the DSL program is already as efficient 
as it gets...


Of course, there might also be ways to play with configs to cut down on 
latency to some extend, if e2e latency is your main concern. Again, I 
don't know the use case: for many case, sub-second latency is actually 
sufficient.


HTH.

-Matthias

On 2/7/24 7:41 AM, Karsten Stöckmann wrote:

Sorry for being late with the response - I've been quite busy working
on our Streams application lately.

That leads me back to my initial question. The Folder class contains
multiple fields with FK pointing to the Person table, all of them with
different semantics (customer, billing address, etc). So in order to
find _all_ folders related to a particular person regardless of its
role, I guess I need to

a) re-key the folder table on each person FK independently and then
b) outer join the result tables.

The entire topology is insanely complex, I've got around 10 tables
with different levels of nesting (e.g. folder -- 1:n --> dependency a
-- 1:n --> dependency b) that all need to be aggregated and in the end
re-keyed to person IDs in order to build an aggregate person. There
are 14 sub topologies... - measuring the e2e latency shows values
around 600ms which seems rather high to me. Does that sound crazy? ;)

Best wishes
Karsten

Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax :


I see. You need to ensure that you get _all_ Person.

For this case, I guess you are right. You would need to first aggregate
the folder per person:

KTable allPersonFolders =
  folder.groupBy((...) -> (folder.customerId, ...))
.aggregate(...)

And in a second step, do a left join:

result = personTable.leftJoin(allPersonFolders,...)



Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.


Not sure if I can follow. What do you mean by "crashed". -- For tables,
there is no `selectKey()` nor  a `repartition()` as explained in my
previous reply. However, doing a `table.groupBy(...)` will set a new key
and repartition the data to your needs.


-Matthias


On 2/1/24 1:12 AM, Karsten Stöckmann wrote:

Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax :


Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
 folderTable
 .join(personTable, (folderId, folder) -> folder.customerId, ...)
 .groupBy((...) -> (personId, ...))
 .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
 Long id;
 String firstname;
 String lastname;
 // some content
}

class Folder {
 Long id;
 String folderNumber;
 // some other content
 Long customerId; // FK, points to Person.id
 Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys 

Re: Re-key by multiple properties without composite key

2024-02-07 Thread Karsten Stöckmann
Sorry for being late with the response - I've been quite busy working
on our Streams application lately.

That leads me back to my initial question. The Folder class contains
multiple fields with FK pointing to the Person table, all of them with
different semantics (customer, billing address, etc). So in order to
find _all_ folders related to a particular person regardless of its
role, I guess I need to

a) re-key the folder table on each person FK independently and then
b) outer join the result tables.

The entire topology is insanely complex, I've got around 10 tables
with different levels of nesting (e.g. folder -- 1:n --> dependency a
-- 1:n --> dependency b) that all need to be aggregated and in the end
re-keyed to person IDs in order to build an aggregate person. There
are 14 sub topologies... - measuring the e2e latency shows values
around 600ms which seems rather high to me. Does that sound crazy? ;)

Best wishes
Karsten

Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax :
>
> I see. You need to ensure that you get _all_ Person.
>
> For this case, I guess you are right. You would need to first aggregate
> the folder per person:
>
> KTable allPersonFolders =
>  folder.groupBy((...) -> (folder.customerId, ...))
>.aggregate(...)
>
> And in a second step, do a left join:
>
> result = personTable.leftJoin(allPersonFolders,...)
>
>
> > Reading the topic as a table directly did not work out as that crashed
> > the application; apparently reading the topic as a KTable and then
> > using that for three independent re-key-operations is not allowed.
>
> Not sure if I can follow. What do you mean by "crashed". -- For tables,
> there is no `selectKey()` nor  a `repartition()` as explained in my
> previous reply. However, doing a `table.groupBy(...)` will set a new key
> and repartition the data to your needs.
>
>
> -Matthias
>
>
> On 2/1/24 1:12 AM, Karsten Stöckmann wrote:
> > Thanks so much for taking a look. An FK-table-table join is an inner
> > join which implies there would be no Person entites without associated
> > Folders. Unfortunately, that's not the case. That lead me to an
> > attempt of re-keying the Folder topic by each of the three possible
> > foreign keys in order to be able to left join Persons against each of
> > the three re-keyed KTables to build an eventual Person aggregation
> > containing all possible Folders associated in any way.
> >
> > Reading the topic as a table directly did not work out as that crashed
> > the application; apparently reading the topic as a KTable and then
> > using that for three independent re-key-operations is not allowed.
> >
> > Best wishes,
> > Karsten
> >
> > Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax 
> > :
> >>
> >> Thanks for the details. This does make sense.
> >>
> >> So it seems you can read all topic as table (ie, builder.table("topic")
> >> -- no need to so `builder.stream().toTable()`).
> >>
> >> And you can use the built-in FK-table-table join, and aggregate the result:
> >>
> >> KTable result =
> >> folderTable
> >> .join(personTable, (folderId, folder) -> folder.customerId, ...)
> >> .groupBy((...) -> (personId, ...))
> >> .aggregate(...);
> >> result.toStream().to("resultTopic");
> >>
> >> Note the fk-extractor `(folderId, folder) -> folder.customerId` that
> >> tells the join to use `customerId` from the `folderTable` to lookup the
> >> person from personTable.
> >>
> >> Think of `folderTable` as fact-table and `personTable` as dimension table.
> >>
> >>
> >> KS will take care of everything else under the hood automatically.
> >>
> >>
> >> -Matthias
> >>
> >> On 1/30/24 11:25 AM, Karsten Stöckmann wrote:
> >>> Matthias, thanks for getting back on this. I'll try to illustrate my
> >>> intent with an example as I'm not yet fully familiar with Kafka
> >>> (Streams) and its idioms...
> >>>
> >>> Assume classes Person and Folder:
> >>>
> >>> class Person {
> >>> Long id;
> >>> String firstname;
> >>> String lastname;
> >>> // some content
> >>> }
> >>>
> >>> class Folder {
> >>> Long id;
> >>> String folderNumber;
> >>> // some other content
> >>> Long customerId; // FK, points to Person.id
> >>> Long billingAddressId; // FK, also points to Person.id
> >>> }
> >>>
> >>> Thus both foreign keys of Folder point to Person entities, yet with
> >>> different semantics. They're not composite keys but act independently.
> >>>
> >>> Now assume I want to build an aggregate Person object containing
> >>> Folder.folderNumber of all folders associated with a Person entity,
> >>> regardless whether it acts as a customer or billing address. My
> >>> (naive) idea was to build re-keyed KTables by Folder.customerId and
> >>> Folder.billingAddressId and then joining / aggregating them with the
> >>> Person KTable in order to build something like this:
> >>>
> >>> class AggregatedPerson {
> >>> Long id;
> >>> List folderNumbers; // or even List
> >>> // 

Re: [DISCUSS] KIP-1010: Topic Partition Quota

2024-02-07 Thread Viktor Somogyi-Vass
Hi Afshin,

We keep KIP discussions on d...@kafka.apache.org so please post this over
there too. I'll go over this later this week but devs usually monitor that
list more frequently and you'll have better chances of getting a reply
there.

Regards,
Viktor

On Wed, Jan 17, 2024 at 12:03 AM Afshin Moazami
 wrote:

> Hi folks,
> I am not sure what is the KIP life-cycle and how we can get more attention
> on them, so I just reply to this thread with the hope to get some
> discussion started.
>
> Thanks,
> Afshin
>
> On Mon, Dec 11, 2023 at 10:43 AM Afshin Moazami 
> wrote:
>
> > Hi folks,
> > I would like to propose a new feature to extend the quota management in
> > Kafka to support topic-partition based quotas. The following is the link
> to
> > the KIP
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota
> >
> >
> > Best,
> > Afshin Moazami
> >
>


Regarding the solution implemented is correct or wrong ?

2024-02-07 Thread Vinay Reddy Pannala
Hi Team,

I am working on a story where I have to send the records from the aggregation 
processor along with uuid, I have a deque topic, and adding the records to the 
deque topic and forwarding it from the Main Aggregate class. But when we do 
perf testing the TPS is coming around 1200/1300 but in general without this 
implementation present in the code the TPS is around 5800/5900.

just want to make sure the implementation is in right way.


I am attaching the code snippets from the two classes here:

Main Class: Aggregate class

The punctuator value I used is 1 sec and 1 min for both the time TPS almost 
similar.


if(processedDestination!=null) {
KStream[] postBranchArray = 
getPostBranchStreamsArrayForProcessedTopic(stream);
forwardStreamToProcessedDestination(postBranchArray[1], 
processedDestination);
}

private KStream[] 
getPostBranchStreamsArrayForProcessedTopic(KStream 
stream) {
return stream
.process(() -> new Processor() {
ProcessorContext context;

public void close() {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();

// add the stored headers for processed topic records 
to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(), 
entry.getValue(), context.currentStreamTimeMs(), headers));
}
}

public void init(ProcessorContext context) {
this.context = context;

this.context.schedule(DEFAULT_PUNCTUATION_INTERVAL_PROCESSEDTOPIC, 
WALL_CLOCK_TIME, timestamp -> {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();

// add the stored headers for processed topic 
records to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(), 
entry.getValue(), timestamp, headers));
}
});
}

public void process(Record record) {
context.forward(record);
}
})
.branch((k, v) -> Objects.equals(aggregateSchema, v.getSchema()), 
(k, v) -> true);
}


protected void forwardStreamToProcessedDestination(KStream stream, String 
processedDestination) {
createProcessor(stream, false)
.to(processedDestination);
}


The Time window processor class:

Just adding the records to deque topic:

public void process(Record, V> record) {

//other code

if(Boolean.TRUE.equals(enableProcessedDestination)){
processedTopicDeque.add(new 
KeyValueHeader<>(key.key(),value,headers.get()));
}
//other code
}




Kind Regards,

Vinay Reddy Pannala

Software Developer

vinay...@amdocs.com

www.zinkworks.com

[cid:f87d3560-fabe-4c8a-85df-dc9ba2830d68]
This email and the information contained herein is proprietary and confidential 
and subject to the Amdocs Email Terms of Service, which you may review at 
https://www.amdocs.com/about/email-terms-of-service 



Regarding the solution implemented is correct or wrong ?

2024-02-07 Thread Vinay Reddy Pannala
Hi Team,

I am working on a story where I have to send the records from the aggregation 
processor along with uuid, I have a deque topic, and adding the records to the 
deque topic and forwarding it from the Main Aggregate class. But when we do 
perf testing the TPS is coming around 1200/1300 but in general without this 
implementation present in the code the TPS is around 5800/5900.

just want to make sure the implementation is in right way.


I am attaching the code snippets from the two classes here:

Main Class: Aggregate class

The punctuator value I used is 1 sec and 1 min for both the time TPS almost 
similar.


if(processedDestination!=null) {
KStream[] postBranchArray = 
getPostBranchStreamsArrayForProcessedTopic(stream);
forwardStreamToProcessedDestination(postBranchArray[1], 
processedDestination);
}

private KStream[] 
getPostBranchStreamsArrayForProcessedTopic(KStream 
stream) {
return stream
.process(() -> new Processor() {
ProcessorContext context;

public void close() {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();

// add the stored headers for processed topic records 
to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(), 
entry.getValue(), context.currentStreamTimeMs(), headers));
}
}

public void init(ProcessorContext context) {
this.context = context;

this.context.schedule(DEFAULT_PUNCTUATION_INTERVAL_PROCESSEDTOPIC, 
WALL_CLOCK_TIME, timestamp -> {
KeyValueHeader entry;
while ((entry = processedTopicDeque.poll()) != null) {
Headers headers = new RecordHeaders();

// add the stored headers for processed topic 
records to the context
entry.getHeaders().forEach(h -> headers.add(h));
context.forward(new Record(entry.getKey(), 
entry.getValue(), timestamp, headers));
}
});
}

public void process(Record record) {
context.forward(record);
}
})
.branch((k, v) -> Objects.equals(aggregateSchema, v.getSchema()), 
(k, v) -> true);
}


protected void forwardStreamToProcessedDestination(KStream stream, String 
processedDestination) {
createProcessor(stream, false)
.to(processedDestination);
}


The Time window processor class:

Just adding the records to deque topic:

public void process(Record, V> record) {

//other code

if(Boolean.TRUE.equals(enableProcessedDestination)){
processedTopicDeque.add(new 
KeyValueHeader<>(key.key(),value,headers.get()));
}
//other code
}






Kind Regards,

Vinay Reddy Pannala

Software Developer

vinay...@amdocs.com

www.zinkworks.com

[cid:7ee3d8c9-47ee-41b5-bf87-887dcab85587]
This email and the information contained herein is proprietary and confidential 
and subject to the Amdocs Email Terms of Service, which you may review at 
https://www.amdocs.com/about/email-terms-of-service