Re: Fix slow processing rate in Kafka streams

2024-04-05 Thread Matthias J. Sax

Perf tuning is always tricky... 350 rec/sec sounds pretty low though.

You would first need to figure out where the bottleneck is. Kafka 
Streams exposes all kind of metrics: 
https://kafka.apache.org/documentation/#kafka_streams_monitoring


Might be good to inspect them as a first step -- maybe something is off 
and gives a first direction.


In general, it would be good to limit it to Kafka network I/O, local 
RocksDB disk I/O, or CPU utilization -- each one could be the bottleneck 
and we would need to first know which one before you can take any action 
to change configurations.


HTH.

-Matthias

On 4/4/24 7:21 PM, Nirmal Das wrote:

Hi All,

My streams application is not processing more than 350 records/sec on a
high load of 3milliom records produced every 2-3 minutes.

My scenarios are as below -
I am on Kafka and streams version of 3.5.1 .
My key-value pair is in protobuf format .
I do a groupbykey followed by TimeWindow of 10 mins with grace period of 6
hours . It is then followed by a aggregate function which stores the first
and last offset of the record along with partition for that message key.

Am I doing something wrong? Am I doing something anti-pattern which is
throttling the system ? How can I improve this?

Regards,
Dev Lover



Re: outerJoin confusion

2024-04-05 Thread Chad Preisler
I was able to get my test to complete correctly setting the internal
setting and removing the calls to set the wall clock.

props.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX,
0L);

Thanks Shashwat and Matthias for giving me two solutions.


On Thu, Apr 4, 2024 at 12:23 PM Matthias J. Sax  wrote:

> Yeah, that is some quirk of KS runtime...
>
> There is some internal config (for perf reasons) that delays emitting
> results... An alternative to advancing wall-clock time would be to set
> this internal config to zero, to disable the delay.
>
> Maybe we should disable this config when topology test driver is used
> automatically... It's not the first time it did came up.
>
> I opened a PR for it: https://github.com/apache/kafka/pull/15660
>
>
> -Matthias
>
>
>
> On 4/3/24 3:52 PM, Chad Preisler wrote:
> > Changing the code to this...
> >
> > assertTrue(outputTopic.isEmpty());
> >  testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >  leftTopic.pipeInput("1", "test string 3", 4002L);
> >  testDriver.advanceWallClockTime(Duration.ofMillis(2001));
> >  leftTopic.pipeInput("1", "test string 4", 6004L);
> >
> > Did appear to fix the issue. Output:
> >
> > First join result:
> > Key: 1 Value: test string 1, null
> > Second join result:
> > Key: 1 Value: test string 2, null
> > Key: 1 Value: test string 3, null
> >
> > Still a little strange that it works the first time without advancing the
> > wall clock.
> >
> > On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
> > shashwat.pandey@gmail.com> wrote:
> >
> >> I believe you need to advanceWallClockTime
> >>
> >>
> https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-
> >>
> >>
> >> Regards,
> >> Shashwat Pandey
> >>
> >>
> >> On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
> >> wrote:
> >>
> >>> Seems like there is some issue with the TopologyTestDriver. I am able
> to
> >>> run the same stream against Kakfa and I'm getting the output I expect.
> >> I'd
> >>> appreciate it if someone could confirm that there is an issue with the
> >>> TopologyTestDriver. If there is, any suggestions on how to test this
> type
> >>> of join?
> >>>
> >>> On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
> >>> wrote:
> >>>
>  Hello,
> 
>  I'm confused about the outerJoin and when records are produced with
> the
>  following code.
> 
>  Topology buildTopology() {
>   var builder = new StreamsBuilder();
>   var leftStream = builder.stream("leftSecondsTopic",
>  Consumed.with(Serdes.String(), Serdes.String()));
>   var rightStream = builder.stream("rightSecondsTopic",
>  Consumed.with(Serdes.String(), Serdes.String()));
> 
>   leftStream.outerJoin(rightStream, (left, right) -> left + ",
> "
> >> +
>  right,
> 
>  JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
>   .to("outputTopicSeconds");
> 
>   return builder.build();
>   }
> 
>  Here is the test driver.
> 
>  @Test
>   public void testSecondsJoinDoesNotWork() {
>   Properties props = new Properties();
>   props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "testSeconds");
>   props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
>  Serdes.StringSerde.class);
>   props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
>  Serdes.StringSerde.class);
>   var app = new KafkaStreamJoinTest();
>   var serializer = new StringSerializer();
> 
>   try(var testDriver = new
> >> TopologyTestDriver(app.buildTopology(),
>   props)) {
>   var leftTopic =
>  testDriver.createInputTopic("leftSecondsTopic",
>   serializer, serializer, Instant.ofEpochMilli(0L),
>  Duration.ZERO);
>   leftTopic.pipeInput("1", "test string 1", 0L);
>   leftTopic.pipeInput("1", "test string 2", 2001L);
> 
>   var outputTopic =
>  testDriver.createOutputTopic("outputTopicSeconds",
>   new StringDeserializer(), new
> >> StringDeserializer());
>   assertFalse(outputTopic.isEmpty());
>   System.out.println("First join result:");
>   outputTopic.readKeyValuesToList()
>   .forEach((keyValue)->
>   System.out.println("Key: " + keyValue.key + "
> >> Value:
> >>> "
>  + keyValue.value));
> 
>   assertTrue(outputTopic.isEmpty());
> 
>   leftTopic.pipeInput("1", "test string 3", 4002L);
>   leftTopic.pipeInput("1", "test string 4", 6004L);
> 
>   System.out.println("Second join result:");
>   outputTopic.readKeyValuesToList()
> 

[ANNOUNCE] Apache Kafka 3.6.2

2024-04-05 Thread Manikumar
The Apache Kafka community is pleased to announce the release for
Apache Kafka 3.6.2

This is a bug fix release and it includes fixes and improvements from 28 JIRAs.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.6.2/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and Scala 2.13) from:
https://kafka.apache.org/downloads#3.6.2

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 40 contributors to this release!
(Please report an unintended omission)

Akhilesh Chaganti, Andrew Schofield, Anton Liauchuk, Bob Barrett,
Bruno Cadonna, Cheng-Kai, Zhang, Chia-Ping Tsai, Chris Egerton, Colin
P. McCabe, Colin Patrick McCabe, David Arthur, David Jacot, David Mao,
Divij Vaidya, Edoardo Comar, Emma Humber, Gaurav Narula, Greg Harris,
hudeqi, Ismael Juma, Jason Gustafson, Jim Galasyn, Joel Hamill, Johnny
Hsu, José Armando García Sancio, Justine Olshan, Luke Chen, Manikumar
Reddy, Matthias J. Sax, Mayank Shekhar Narula, Mickael Maison, Mike
Lloyd, Paolo Patierno, PoAn Yang, Ron Dagostino, Sagar Rao, Stanislav
Kozlovski, Walker Carlson, Yash Mayya

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,

Manikumar
Release Manager for Apache Kafka 3.6.2