Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-22 Thread Petter Arvidsson
Hi Sophie,

Thank you for your elaborate response.

On Mon, Oct 14, 2019 at 10:13 PM Sophie Blee-Goldman 
wrote:

> Honestly I can't say whether 256 partitions is enough to trigger the
> performance issues
> in 2.3.0 but I'd definitely recommend upgrading as soon as you can, just in
> case. On a
> related note, how many instances, with how many threads, are you running?
> 256 partitions
> with several subtopologies will result in a large number of tasks so make
> sure you're parallelizing
> as much as possible.
>
If you are talking about "num.stream.threads" I am using the default value
of 1. I assumed this would only matter if the application consumed a lot of
CPU (which it currently does not do). I will experiment a bit with
different numbers here.

>
> That said, I may have misread your topology the first time around -- are
> you saying that you
> load data from topic2, join it with some other data, then write back to the
> same topic (topic2)?
> Having cycles in your topology is not really supported, as stream
> processing should generally
> be a DAG -- when you introduce a cycle things often don't behave the way
> you might expect
> or want. Is there a particular reason you need to do this?
>
The data on the topic is used to derive new data of the same type. Due to
how we have set things up it would be preferable if the data can go back to
the same topic again. Can you elaborate a little bit on what can happen
when there is a cycle in the graph?

>
> > the delay from sending a message on topic1 to the point where messages
> received on topic2
> are passing the join
> Also, can you explain what you mean by this? That is, can you explain how
> you are determining
> when  messages are "passing the join" -- are you just reading from topic2?
> Are you distinguishing
> the "new" data, written by Streams, from the "original data" that is
> written to this topic from elsewhere?
>

The data read from topic1 is joined using join (with Ktable) on KStream
[1]. So the decision on what is let pass the join is purely based on if the
keys are matching or not. The messages that we output on topic2 never
matches keys in the table (to avoid a loop).
[1]:
https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#join-org.apache.kafka.streams.kstream.KTable-org.apache.kafka.streams.kstream.ValueJoiner-org.apache.kafka.streams.kstream.Joined-

>
> On Thu, Oct 10, 2019 at 3:05 AM Petter Arvidsson <
> petter.arvids...@relayr.io>
> wrote:
>
> > Hi Sophie,
> >
> > Thank you for your response.
> >
> > I tested the proposed setting for CACHE_MAX_BYTES_BUFFERING_CONFIG and it
> > seem to not significantly change the behavior of the application. The
> > latency remains very similar. The documentation states the following
> > regarding CACHE_MAX_BYTES_BUFFERING_CONFIG and COMMIT_INTERVAL_MS_CONFIG:
> >
> > ---
> > To enable caching but still have an upper bound on how long records will
> be
> > cached, you can set the commit interval. In this example, it is set to
> 1000
> > milliseconds:
> >
> > Properties props = new Properties();
> > // Enable record cache of size 10 MB.
> > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 *
> > 1024L);
> > // Set commit interval to 1 second.
> > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
> > ---
> >
> > Which made me believe that the COMMIT_INTERVAL_MS_CONFIG would already
> > "override" the CACHE_MAX_BYTES_BUFFERING_CONFIG and provide an upper
> bound
> > of the latency of 1s per processing step by flushing buffers every
> second.
> > Is this the case or does these two configuration values interact in some
> > other way?
> >
> > We are using 256 partitions for all our topics. Is this to be considered
> a
> > very high partition count? Do you think we might be affected by the bug
> in
> > 2.3.0?
> >
> > Thank you for the help!
> >
> > Best regards,
> > Petter
> >
> > On Wed, Oct 9, 2019 at 7:33 PM Sophie Blee-Goldman 
> > wrote:
> >
> > > Hi Petter,
> > >
> > > I'd recommend turning off caching by setting
> > > p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);
> > >
> > > 2.3.0 also has some known performance issues that will be fixed in
> 2.3.1,
> > > but they
> > > shouldn't be noticeable if you turn caching off and aren't
> > reading/writing
> > > to topics
> > > with a very high partition count. These are fixed in 2.3.1 which should
> > be
> > > released
> > > soon for you to upgrade, but the caching is likely the main reason for
> > the
> > > latency you see.
> > >
> > > I'd also note that Streams, and Kafka in general, is typically tuned
> for
> > > high
> > > throughput rather than low latency, so I wouldn't be too concerned
> about
> > > a large latency unless that is a specific requirement.
> > >
> > > Cheers,
> > > Sophie
> > >
> > > On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson <
> > > petter.arvids...@relayr.io>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a fairly simple kafka streams 

Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-14 Thread Sophie Blee-Goldman
Honestly I can't say whether 256 partitions is enough to trigger the
performance issues
in 2.3.0 but I'd definitely recommend upgrading as soon as you can, just in
case. On a
related note, how many instances, with how many threads, are you running?
256 partitions
with several subtopologies will result in a large number of tasks so make
sure you're parallelizing
as much as possible.

That said, I may have misread your topology the first time around -- are
you saying that you
load data from topic2, join it with some other data, then write back to the
same topic (topic2)?
Having cycles in your topology is not really supported, as stream
processing should generally
be a DAG -- when you introduce a cycle things often don't behave the way
you might expect
or want. Is there a particular reason you need to do this?

> the delay from sending a message on topic1 to the point where messages
received on topic2
are passing the join
Also, can you explain what you mean by this? That is, can you explain how
you are determining
when  messages are "passing the join" -- are you just reading from topic2?
Are you distinguishing
the "new" data, written by Streams, from the "original data" that is
written to this topic from elsewhere?

On Thu, Oct 10, 2019 at 3:05 AM Petter Arvidsson 
wrote:

> Hi Sophie,
>
> Thank you for your response.
>
> I tested the proposed setting for CACHE_MAX_BYTES_BUFFERING_CONFIG and it
> seem to not significantly change the behavior of the application. The
> latency remains very similar. The documentation states the following
> regarding CACHE_MAX_BYTES_BUFFERING_CONFIG and COMMIT_INTERVAL_MS_CONFIG:
>
> ---
> To enable caching but still have an upper bound on how long records will be
> cached, you can set the commit interval. In this example, it is set to 1000
> milliseconds:
>
> Properties props = new Properties();
> // Enable record cache of size 10 MB.
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 *
> 1024L);
> // Set commit interval to 1 second.
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
> ---
>
> Which made me believe that the COMMIT_INTERVAL_MS_CONFIG would already
> "override" the CACHE_MAX_BYTES_BUFFERING_CONFIG and provide an upper bound
> of the latency of 1s per processing step by flushing buffers every second.
> Is this the case or does these two configuration values interact in some
> other way?
>
> We are using 256 partitions for all our topics. Is this to be considered a
> very high partition count? Do you think we might be affected by the bug in
> 2.3.0?
>
> Thank you for the help!
>
> Best regards,
> Petter
>
> On Wed, Oct 9, 2019 at 7:33 PM Sophie Blee-Goldman 
> wrote:
>
> > Hi Petter,
> >
> > I'd recommend turning off caching by setting
> > p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);
> >
> > 2.3.0 also has some known performance issues that will be fixed in 2.3.1,
> > but they
> > shouldn't be noticeable if you turn caching off and aren't
> reading/writing
> > to topics
> > with a very high partition count. These are fixed in 2.3.1 which should
> be
> > released
> > soon for you to upgrade, but the caching is likely the main reason for
> the
> > latency you see.
> >
> > I'd also note that Streams, and Kafka in general, is typically tuned for
> > high
> > throughput rather than low latency, so I wouldn't be too concerned about
> > a large latency unless that is a specific requirement.
> >
> > Cheers,
> > Sophie
> >
> > On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson <
> > petter.arvids...@relayr.io>
> > wrote:
> >
> > > Hi,
> > >
> > > I have a fairly simple kafka streams application that read messages
> from
> > > two topics. The problem I am facing is that the delay between sending
> > > events to the streams application and it producing results is very high
> > (as
> > > in several minutes). My question is: how can I make this latency
> smaller?
> > >
> > > The streams is doing the following:
> > > ktable1 = topic1
> > >   -> (filter out messages using flatMap)
> > >   -> groupBy (with new key, adds internal rekeying topic)
> > >   -> aggregate (in memory store backed by internal compacted topic)
> > >
> > > ktabe2 = topic2
> > >   -> (rekey to same key as ktable1 over internal topic)
> > >   -> join (with ktable1)
> > >   -> aggregate (in memory store backed by internal compacted topic)
> > >
> > > ktable2.toStream.to(topic2)
> > >
> > > Ktable1 keep configuration that allows messages to pass through and be
> > > aggregated into ktable2. Ktable2 keeps aggregates based on messages on
> > > topic2. Ktable2.toStream is then used to put the aggregated messages
> back
> > > out on topic2. The "problem" (or misunderstanding as to how kafka
> stream
> > is
> > > processing messages) is that the delay from sending a message on topic1
> > to
> > > the point where messages received on topic2 are passing the join is
> > several
> > > minutes. With the settings I have (see below) on a not that heavily
> > loaded
> > > 

Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-10 Thread Petter Arvidsson
Hi Sophie,

Thank you for your response.

I tested the proposed setting for CACHE_MAX_BYTES_BUFFERING_CONFIG and it
seem to not significantly change the behavior of the application. The
latency remains very similar. The documentation states the following
regarding CACHE_MAX_BYTES_BUFFERING_CONFIG and COMMIT_INTERVAL_MS_CONFIG:

---
To enable caching but still have an upper bound on how long records will be
cached, you can set the commit interval. In this example, it is set to 1000
milliseconds:

Properties props = new Properties();
// Enable record cache of size 10 MB.
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 *
1024L);
// Set commit interval to 1 second.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
---

Which made me believe that the COMMIT_INTERVAL_MS_CONFIG would already
"override" the CACHE_MAX_BYTES_BUFFERING_CONFIG and provide an upper bound
of the latency of 1s per processing step by flushing buffers every second.
Is this the case or does these two configuration values interact in some
other way?

We are using 256 partitions for all our topics. Is this to be considered a
very high partition count? Do you think we might be affected by the bug in
2.3.0?

Thank you for the help!

Best regards,
Petter

On Wed, Oct 9, 2019 at 7:33 PM Sophie Blee-Goldman 
wrote:

> Hi Petter,
>
> I'd recommend turning off caching by setting
> p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);
>
> 2.3.0 also has some known performance issues that will be fixed in 2.3.1,
> but they
> shouldn't be noticeable if you turn caching off and aren't reading/writing
> to topics
> with a very high partition count. These are fixed in 2.3.1 which should be
> released
> soon for you to upgrade, but the caching is likely the main reason for the
> latency you see.
>
> I'd also note that Streams, and Kafka in general, is typically tuned for
> high
> throughput rather than low latency, so I wouldn't be too concerned about
> a large latency unless that is a specific requirement.
>
> Cheers,
> Sophie
>
> On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson <
> petter.arvids...@relayr.io>
> wrote:
>
> > Hi,
> >
> > I have a fairly simple kafka streams application that read messages from
> > two topics. The problem I am facing is that the delay between sending
> > events to the streams application and it producing results is very high
> (as
> > in several minutes). My question is: how can I make this latency smaller?
> >
> > The streams is doing the following:
> > ktable1 = topic1
> >   -> (filter out messages using flatMap)
> >   -> groupBy (with new key, adds internal rekeying topic)
> >   -> aggregate (in memory store backed by internal compacted topic)
> >
> > ktabe2 = topic2
> >   -> (rekey to same key as ktable1 over internal topic)
> >   -> join (with ktable1)
> >   -> aggregate (in memory store backed by internal compacted topic)
> >
> > ktable2.toStream.to(topic2)
> >
> > Ktable1 keep configuration that allows messages to pass through and be
> > aggregated into ktable2. Ktable2 keeps aggregates based on messages on
> > topic2. Ktable2.toStream is then used to put the aggregated messages back
> > out on topic2. The "problem" (or misunderstanding as to how kafka stream
> is
> > processing messages) is that the delay from sending a message on topic1
> to
> > the point where messages received on topic2 are passing the join is
> several
> > minutes. With the settings I have (see below) on a not that heavily
> loaded
> > system, I would assume the latency would be a couple of seconds (based on
> > the COMMIT_INTERVAL_MS_CONFIG).
> >
> > I use the following settings (as well as settings for bootstrap servers,
> > application id and so forth):
> > p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
> > p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> > p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> >
> > The store used for the KTables is the one returned by
> > "Stores.inMemoryKeyValueStore()".
> >
> > Kafka libraries use version "2.3.0" and the "kafka-streams-scala"
> scaladsl
> > is used to build the streams. The broker is using version "1.1.0".
> >
> > Best regards,
> > Petter
> >
>


Re: Long delay between incoming and outgoing messages using kafka streams

2019-10-09 Thread Sophie Blee-Goldman
Hi Petter,

I'd recommend turning off caching by setting
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING, 0);

2.3.0 also has some known performance issues that will be fixed in 2.3.1,
but they
shouldn't be noticeable if you turn caching off and aren't reading/writing
to topics
with a very high partition count. These are fixed in 2.3.1 which should be
released
soon for you to upgrade, but the caching is likely the main reason for the
latency you see.

I'd also note that Streams, and Kafka in general, is typically tuned for
high
throughput rather than low latency, so I wouldn't be too concerned about
a large latency unless that is a specific requirement.

Cheers,
Sophie

On Wed, Oct 9, 2019 at 6:05 AM Petter Arvidsson 
wrote:

> Hi,
>
> I have a fairly simple kafka streams application that read messages from
> two topics. The problem I am facing is that the delay between sending
> events to the streams application and it producing results is very high (as
> in several minutes). My question is: how can I make this latency smaller?
>
> The streams is doing the following:
> ktable1 = topic1
>   -> (filter out messages using flatMap)
>   -> groupBy (with new key, adds internal rekeying topic)
>   -> aggregate (in memory store backed by internal compacted topic)
>
> ktabe2 = topic2
>   -> (rekey to same key as ktable1 over internal topic)
>   -> join (with ktable1)
>   -> aggregate (in memory store backed by internal compacted topic)
>
> ktable2.toStream.to(topic2)
>
> Ktable1 keep configuration that allows messages to pass through and be
> aggregated into ktable2. Ktable2 keeps aggregates based on messages on
> topic2. Ktable2.toStream is then used to put the aggregated messages back
> out on topic2. The "problem" (or misunderstanding as to how kafka stream is
> processing messages) is that the delay from sending a message on topic1 to
> the point where messages received on topic2 are passing the join is several
> minutes. With the settings I have (see below) on a not that heavily loaded
> system, I would assume the latency would be a couple of seconds (based on
> the COMMIT_INTERVAL_MS_CONFIG).
>
> I use the following settings (as well as settings for bootstrap servers,
> application id and so forth):
> p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
> p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
>
> The store used for the KTables is the one returned by
> "Stores.inMemoryKeyValueStore()".
>
> Kafka libraries use version "2.3.0" and the "kafka-streams-scala" scaladsl
> is used to build the streams. The broker is using version "1.1.0".
>
> Best regards,
> Petter
>


Long delay between incoming and outgoing messages using kafka streams

2019-10-09 Thread Petter Arvidsson
Hi,

I have a fairly simple kafka streams application that read messages from
two topics. The problem I am facing is that the delay between sending
events to the streams application and it producing results is very high (as
in several minutes). My question is: how can I make this latency smaller?

The streams is doing the following:
ktable1 = topic1
  -> (filter out messages using flatMap)
  -> groupBy (with new key, adds internal rekeying topic)
  -> aggregate (in memory store backed by internal compacted topic)

ktabe2 = topic2
  -> (rekey to same key as ktable1 over internal topic)
  -> join (with ktable1)
  -> aggregate (in memory store backed by internal compacted topic)

ktable2.toStream.to(topic2)

Ktable1 keep configuration that allows messages to pass through and be
aggregated into ktable2. Ktable2 keeps aggregates based on messages on
topic2. Ktable2.toStream is then used to put the aggregated messages back
out on topic2. The "problem" (or misunderstanding as to how kafka stream is
processing messages) is that the delay from sending a message on topic1 to
the point where messages received on topic2 are passing the join is several
minutes. With the settings I have (see below) on a not that heavily loaded
system, I would assume the latency would be a couple of seconds (based on
the COMMIT_INTERVAL_MS_CONFIG).

I use the following settings (as well as settings for bootstrap servers,
application id and so forth):
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000)
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)

The store used for the KTables is the one returned by
"Stores.inMemoryKeyValueStore()".

Kafka libraries use version "2.3.0" and the "kafka-streams-scala" scaladsl
is used to build the streams. The broker is using version "1.1.0".

Best regards,
Petter