Re: Consumer hangs at poll() and never throw exception on client

2019-12-09 Thread Matthias J. Sax
Are both consumers in the same consumer group, ie, use the same `group.id`?

If yes, how many partitions does the topic have? If it has only one
partition, the observed behavior is expected, because a single
partitions can only be read by a single consumer instance per consumer
group. The second consumer won't have any assigned partitions to read from.

-Matthias

On 12/9/19 11:12 PM, Frank Zhou wrote:
> Hi,
> 
> I am quite new to Kafka, and I have encountered a weird case during QA
> stage for our application. Now we have 2 consumers consuming same topic
> in kafka cluster. The first started consumer works fine and get closed
> after getting all the messages. After that, the second one started and
> just hang there in the poll(), which we set the Duration to 10 seconds,
> and will quit the loop if no records are fetched. From kafka server log,
> I don't see warning or error for the second consumer. 
> 
> Attached client/server log related to both consumers. I am using 2.3.1
> client, 2.2.2 kafka servers. Looking forward to you feedback.
> 
> 
> Best regards,
> -- 
> *Frank Zhou*
> R&D, Itiviti
> Java Developer
> D +852 2521 7480
> frank.z...@itiviti.com 
> 
> __
> 
> itiviti.com 
> 
> /The information contained in or attached to this email is strictly
> confidential. If you are not the intended recipient, please notify us
> immediately by telephone and return the message to us./
> 
> /Email communications by definition contain personal information. The
> Itiviti group of companies is subject to European data protection
> regulations. Itiviti’s Privacy Notice is available at www.itiviti.com
> . Itiviti expects the recipient of this email
> to be compliant with Itiviti’s Privacy Notice and applicable
> regulations. Please advise us immediately at
> dataprotectiont...@itiviti.com if you are not compliant with these./
> 
> 
> __
> 
> itiviti.com 
>  ^Follow Itiviti on Linkedin
> 
> 
> The information contained in or attached to this email is strictly
> confidential. If you are not the intended recipient, please notify us
> immediately by telephone and return the message to us. Email
> communications by definition contain personal information. The Itiviti
> group of companies is subject to European data protection regulations.
> 
> Itiviti’s Privacy Notice is available at www.itiviti.com
> . Itiviti expects the recipient of this email
> to be compliant with Itiviti’s Privacy Notice and applicable
> regulations. Please advise us immediately at
> dataprotectiont...@itiviti.com if you are not compliant with these.
> 



signature.asc
Description: OpenPGP digital signature


Running Kafka on a single node machine

2019-12-09 Thread Matthew Torres
Hi,

I want to convince my company to use kafka as our message queue system, was
wondering if it's possible to run kafka on a single machine. We have a
unique use case where all our microservices is run on a single machine and
currently we are using apache amq as our message queue.

Apache amq is good but I feel that we can improve the robustness of our
system by using kafka. So my question is, will it be possible to run kafka
on a single node/machine as a production. We are using k3s as an
orchestration so whenever kafka goes down it can just restart it.

Hoping to hear from you soon. Thank you!

*Sincerely,*

*Matthew Aldrin S. Torres*


Consumer hangs at poll() and never throw exception on client

2019-12-09 Thread Frank Zhou
Hi,

I am quite new to Kafka, and I have encountered a weird case during QA
stage for our application. Now we have 2 consumers consuming same topic in
kafka cluster. The first started consumer works fine and get closed after
getting all the messages. After that, the second one started and just hang
there in the poll(), which we set the Duration to 10 seconds, and will quit
the loop if no records are fetched. From kafka server log, I don't see
warning or error for the second consumer.

Attached client/server log related to both consumers. I am using 2.3.1
client, 2.2.2 kafka servers. Looking forward to you feedback.


Best regards,
-- 
*Frank Zhou*
R&D, Itiviti
Java Developer
D +852 2521 7480
frank.z...@itiviti.com

__

itiviti.com 

*The information contained in or attached to this email is strictly
confidential. If you are not the intended recipient, please notify us
immediately by telephone and return the message to us.*

*Email communications by definition contain personal information. The
Itiviti group of companies is subject to European data protection
regulations. Itiviti’s Privacy Notice is available at www.itiviti.com
. Itiviti expects the recipient of this email to
be compliant with Itiviti’s Privacy Notice and applicable regulations.
Please advise us immediately at dataprotectiont...@itiviti.com if you are
not compliant with these.*

-- 
__

itiviti.com 
 
 Follow Itiviti on Linkedin 





The information contained 
in or attached to this email is strictly confidential. If you are not the 
intended recipient, please notify us immediately by telephone and return 
the message to us. Email communications by definition contain personal 
information. The Itiviti group of companies is subject to European data 
protection regulations.



Itiviti’s Privacy Notice is available at 
www.itiviti.com . Itiviti expects the recipient of 
this email to be compliant with Itiviti’s Privacy Notice and applicable 
regulations. Please advise us immediately at dataprotectiont...@itiviti.com 
if you are not compliant with these.


Re: kafka issue

2019-12-09 Thread Valentin
Hi Chao,

I suppose, you would like to know:
within a consumer group which message is coming from which partition, since 
partitions corresponds to broker and broker = ip, right?

Well, if you really want to know this, then you have to get the context. E.g 
within a processor there is a method called context()

However, keep in mind broker and IP addresses could change. I would not 
recommend to build a business logic on top of ip addresses.
 
Within a producer, you can manage the destination partition / broker. 

Regards
Valentin 

--

> Am 10.12.2019 um 04:48 schrieb 朱超 :
> 
> hi:
>I am a gteen hand of kafka,after installation of new kafka cluster,it 
> raises a  question that how do I know which IP address  a message   come 
> from? Is there any special set about this?
> Eagerly awaiting your reply.
> 
> 
> chao@baozun.com



kafka issue

2019-12-09 Thread 朱超
hi:
I am a gteen hand of kafka,after installation of new kafka cluster,it 
raises a  question that how do I know which IP address  a message   come from? 
Is there any special set about this?
Eagerly awaiting your reply.


chao@baozun.com


Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
You're saying that with a 100ms commit interval, caching won't help because
it would still send the compacted changes to the changelog every 100ms?

Regarding the custom state store I'll look into that because I didn't go
much further than transformers and stores in my kafka experience so I'll
need to understand better what that implies.

Yeah I only have one window per key in the store.

The only thing I don't understand is why cache works 80% of the time and
then suddenly the changelog sent bytes increase 90x.
I mean, if cache wasn't working, why enabling it in our pipeline decreased
the sent bytes from 30-40MB/minute to 400KB/minute?

I'll look into the custom state store tho.

Thanks

--
Alessandro Tagliapietra



On Mon, Dec 9, 2019 at 7:02 PM Sophie Blee-Goldman 
wrote:

> Alright, well I see why you have so much data being sent to the changelog
> if each
> update involves appending to a list and then writing in the whole list. And
> with 340
> records/minute I'm actually not sure how the cache could really help at all
> when it's
> being flushed every 100ms.
>
> Here's kind of a wild idea, if you really only need append semantics: what
> if you wrote
> a custom StateStore that wrapped the normal RocksDBStore (or
> RocksDBWindowStore)
> and did the append for you under the hood? The changelogging layer sits
> between the
> layer that you would call #put on in your transformer and the final layer
> that actually writes
> to the underlying storage engine. If you insert an extra layer and modify
> your transformer
> to only call put on the new data (rather than the entire list) then only
> this new data will get
> sent to the changelog. Your custom storage layer will know it's actually
> append semantics,
> and add the new data to the existing list before sending it on to RocksDB.
>
> Since you only ever have one window per key in the store (right?) you just
> need to make
> sure that nothing from the current window gets deleted prematurely. You'd
> want to turn off
> compaction on the changelog and caching on the store of course, and maybe
> give the
> changelog some extra retention time to be safe.
>
> Obviously I haven't thoroughly verified this alternative, but it seems like
> this approach (or
> something to its effect) could help you cut down on the changelog data.
> WDYT?
>
> On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > Just to give a better context, yes we use EOS and the problem happens in
> > our aggregation store.
> > Basically when windowing data we append each record into a list that's
> > stored in the aggregation store.
> > We have 2 versions, in production we use the kafka streams windowing API,
> > in staging we manually calculate the window end timestamp and aggregate
> > using that timestamp.
> >
> > To give you an example of the staging code, it's a simple transformer
> that:
> >  - if incoming data fits in the same window as the data in store, append
> > the data to the existing store list overwriting the same key and nothing
> is
> > sent downstream
> >  - if incoming data has a timestamp smaller than the existing store data,
> > discard the record
> >  - if incoming data has a timestamp bigger than the existing store data,
> > send the stored list downstream and store the new window data into the
> > store
> >
> > This way we don't use multiple keys (kafka streams instead uses a store
> > where each key is stream-key + window key) as we overwrite the store data
> > using the same key over and over.
> > So what I would expect is that since we're overwriting the same keys
> there
> > isn't more  and more data to be cached as the number of keys are always
> the
> > same and we don't really need to cache more data over time.
> >
> > To respond to your questions:
> >  - yes when I say that cache "stopped/started" working I mean that at
> some
> > point the store started sending more and more data to che changelog topic
> > and then suddenly stopped again even without a restart (a restart always
> > fixes the problem).
> >  - Yes there are no density changes in the input stream, I've checked the
> > number of records sent to the stream input topic and there is a variation
> > of ~10-20 records per minute on an average of 340 records per minute.
> Most
> > of the records are also generated by simulators with very predictable
> > output rate.
> >
> > In the meantime I've enabled reporting of debug metrics (so including
> cache
> > hit ratio) to hopefully get better insights the next time it happens.
> >
> > Thank you in advance
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman 
> > wrote:
> >
> > > It's an LRU cache, so once it gets full new records will cause older
> ones
> > > to be evicted (and thus sent
> > > downstream). Of course this should only apply to records of a different
> > > key, otherwise it will just cause
> > > an update of that key in the cach

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
Alright, well I see why you have so much data being sent to the changelog
if each
update involves appending to a list and then writing in the whole list. And
with 340
records/minute I'm actually not sure how the cache could really help at all
when it's
being flushed every 100ms.

Here's kind of a wild idea, if you really only need append semantics: what
if you wrote
a custom StateStore that wrapped the normal RocksDBStore (or
RocksDBWindowStore)
and did the append for you under the hood? The changelogging layer sits
between the
layer that you would call #put on in your transformer and the final layer
that actually writes
to the underlying storage engine. If you insert an extra layer and modify
your transformer
to only call put on the new data (rather than the entire list) then only
this new data will get
sent to the changelog. Your custom storage layer will know it's actually
append semantics,
and add the new data to the existing list before sending it on to RocksDB.

Since you only ever have one window per key in the store (right?) you just
need to make
sure that nothing from the current window gets deleted prematurely. You'd
want to turn off
compaction on the changelog and caching on the store of course, and maybe
give the
changelog some extra retention time to be safe.

Obviously I haven't thoroughly verified this alternative, but it seems like
this approach (or
something to its effect) could help you cut down on the changelog data.
WDYT?

On Mon, Dec 9, 2019 at 4:35 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hi Sophie,
>
> Just to give a better context, yes we use EOS and the problem happens in
> our aggregation store.
> Basically when windowing data we append each record into a list that's
> stored in the aggregation store.
> We have 2 versions, in production we use the kafka streams windowing API,
> in staging we manually calculate the window end timestamp and aggregate
> using that timestamp.
>
> To give you an example of the staging code, it's a simple transformer that:
>  - if incoming data fits in the same window as the data in store, append
> the data to the existing store list overwriting the same key and nothing is
> sent downstream
>  - if incoming data has a timestamp smaller than the existing store data,
> discard the record
>  - if incoming data has a timestamp bigger than the existing store data,
> send the stored list downstream and store the new window data into the
> store
>
> This way we don't use multiple keys (kafka streams instead uses a store
> where each key is stream-key + window key) as we overwrite the store data
> using the same key over and over.
> So what I would expect is that since we're overwriting the same keys there
> isn't more  and more data to be cached as the number of keys are always the
> same and we don't really need to cache more data over time.
>
> To respond to your questions:
>  - yes when I say that cache "stopped/started" working I mean that at some
> point the store started sending more and more data to che changelog topic
> and then suddenly stopped again even without a restart (a restart always
> fixes the problem).
>  - Yes there are no density changes in the input stream, I've checked the
> number of records sent to the stream input topic and there is a variation
> of ~10-20 records per minute on an average of 340 records per minute. Most
> of the records are also generated by simulators with very predictable
> output rate.
>
> In the meantime I've enabled reporting of debug metrics (so including cache
> hit ratio) to hopefully get better insights the next time it happens.
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
> On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman 
> wrote:
>
> > It's an LRU cache, so once it gets full new records will cause older ones
> > to be evicted (and thus sent
> > downstream). Of course this should only apply to records of a different
> > key, otherwise it will just cause
> > an update of that key in the cache.
> >
> > I missed that you were using EOS, given the short commit interval it's
> hard
> > to see those effects.
> > When you say that it stopped working and then appeared to start working
> > again, are you just
> > referring to the amount of data being sent to the changelog? And you can
> > definitely rule out differences
> > in the density of updates in the input stream?
> >
> >
> >
> > On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > Hi Sophie,
> > >
> > > thanks fo helping.
> > >
> > > By eviction of older records you mean they get flushed to the changelog
> > > topic?
> > > Or the cache is just full and so all new records go to the changelog
> > topic
> > > until the old ones are evicted?
> > >
> > > Regarding the timing, what timing do you mean? Between when the cache
> > stops
> > > and starts working again? We're using EOS os I believe the commit
> > interval
> > > is every 100ms.
> > >
> > > Regar

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
Hi Sophie,

Just to give a better context, yes we use EOS and the problem happens in
our aggregation store.
Basically when windowing data we append each record into a list that's
stored in the aggregation store.
We have 2 versions, in production we use the kafka streams windowing API,
in staging we manually calculate the window end timestamp and aggregate
using that timestamp.

To give you an example of the staging code, it's a simple transformer that:
 - if incoming data fits in the same window as the data in store, append
the data to the existing store list overwriting the same key and nothing is
sent downstream
 - if incoming data has a timestamp smaller than the existing store data,
discard the record
 - if incoming data has a timestamp bigger than the existing store data,
send the stored list downstream and store the new window data into the store

This way we don't use multiple keys (kafka streams instead uses a store
where each key is stream-key + window key) as we overwrite the store data
using the same key over and over.
So what I would expect is that since we're overwriting the same keys there
isn't more  and more data to be cached as the number of keys are always the
same and we don't really need to cache more data over time.

To respond to your questions:
 - yes when I say that cache "stopped/started" working I mean that at some
point the store started sending more and more data to che changelog topic
and then suddenly stopped again even without a restart (a restart always
fixes the problem).
 - Yes there are no density changes in the input stream, I've checked the
number of records sent to the stream input topic and there is a variation
of ~10-20 records per minute on an average of 340 records per minute. Most
of the records are also generated by simulators with very predictable
output rate.

In the meantime I've enabled reporting of debug metrics (so including cache
hit ratio) to hopefully get better insights the next time it happens.

Thank you in advance

--
Alessandro Tagliapietra

On Mon, Dec 9, 2019 at 3:57 PM Sophie Blee-Goldman 
wrote:

> It's an LRU cache, so once it gets full new records will cause older ones
> to be evicted (and thus sent
> downstream). Of course this should only apply to records of a different
> key, otherwise it will just cause
> an update of that key in the cache.
>
> I missed that you were using EOS, given the short commit interval it's hard
> to see those effects.
> When you say that it stopped working and then appeared to start working
> again, are you just
> referring to the amount of data being sent to the changelog? And you can
> definitely rule out differences
> in the density of updates in the input stream?
>
>
>
> On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Sophie,
> >
> > thanks fo helping.
> >
> > By eviction of older records you mean they get flushed to the changelog
> > topic?
> > Or the cache is just full and so all new records go to the changelog
> topic
> > until the old ones are evicted?
> >
> > Regarding the timing, what timing do you mean? Between when the cache
> stops
> > and starts working again? We're using EOS os I believe the commit
> interval
> > is every 100ms.
> >
> > Regards
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> >
> > On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman  >
> > wrote:
> >
> > > It might be that the cache appears to "stop working" because it gets
> > full,
> > > and each
> > > new update causes an eviction (of some older record). This would also
> > > explain the
> > > opposite behavior, that it "starts working" again after some time
> without
> > > being restarted,
> > > since the cache is completely flushed on commit. Does the timing seem
> to
> > > align with your
> > > commit interval (default is 30s)?
> > >
> > > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > > > And it seems that for some reason after a while caching works again
> > > > without a restart of the streams application
> > > >
> > > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> > > >
> > > > I'll try to enable debug metrics and see if I can find something
> useful
> > > > there.
> > > > Any idea is appreciated in the meantime :)
> > > >
> > > > --
> > > > Alessandro Tagliapietra
> > > >
> > > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > > > tagliapietra.alessan...@gmail.com> wrote:
> > > >
> > > >> It seems that even with caching enabled, after a while the sent
> bytes
> > > >> stil go up
> > > >>
> > > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> > > >>
> > > >> you can see the deploy when I've enabled caching but it looks like
> > it's
> > > >> still a temporary solution.
> > > >>
> > > >> --
> > > >> Alessandro Tagliapietra
> > > >>
> > > >>
> > > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> > > >> tagliapietra.alessan...@gmail.com> wrote:
> > > >>
>

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
It's an LRU cache, so once it gets full new records will cause older ones
to be evicted (and thus sent
downstream). Of course this should only apply to records of a different
key, otherwise it will just cause
an update of that key in the cache.

I missed that you were using EOS, given the short commit interval it's hard
to see those effects.
When you say that it stopped working and then appeared to start working
again, are you just
referring to the amount of data being sent to the changelog? And you can
definitely rule out differences
in the density of updates in the input stream?



On Mon, Dec 9, 2019 at 12:26 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hi Sophie,
>
> thanks fo helping.
>
> By eviction of older records you mean they get flushed to the changelog
> topic?
> Or the cache is just full and so all new records go to the changelog topic
> until the old ones are evicted?
>
> Regarding the timing, what timing do you mean? Between when the cache stops
> and starts working again? We're using EOS os I believe the commit interval
> is every 100ms.
>
> Regards
>
> --
> Alessandro Tagliapietra
>
>
>
> On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman 
> wrote:
>
> > It might be that the cache appears to "stop working" because it gets
> full,
> > and each
> > new update causes an eviction (of some older record). This would also
> > explain the
> > opposite behavior, that it "starts working" again after some time without
> > being restarted,
> > since the cache is completely flushed on commit. Does the timing seem to
> > align with your
> > commit interval (default is 30s)?
> >
> > On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > And it seems that for some reason after a while caching works again
> > > without a restart of the streams application
> > >
> > > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> > >
> > > I'll try to enable debug metrics and see if I can find something useful
> > > there.
> > > Any idea is appreciated in the meantime :)
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > >> It seems that even with caching enabled, after a while the sent bytes
> > >> stil go up
> > >>
> > >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> > >>
> > >> you can see the deploy when I've enabled caching but it looks like
> it's
> > >> still a temporary solution.
> > >>
> > >> --
> > >> Alessandro Tagliapietra
> > >>
> > >>
> > >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >>> Could be, but since we have a limite amount of input keys (~30),
> > >>> windowing generates new keys but old ones are never touched again
> > since the
> > >>> data per key is in order, I assume it shouldn't be a big deal for it
> to
> > >>> handle 30 keys
> > >>> I'll have a look at cache metrics and see if something pops out
> > >>>
> > >>> Thanks
> > >>>
> > >>> --
> > >>> Alessandro Tagliapietra
> > >>>
> > >>>
> > >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler 
> > >>> wrote:
> > >>>
> >  Hmm, that’s a good question. Now that we’re talking about caching, I
> >  wonder if the cache was just too small. It’s not very big by
> default.
> > 
> >  On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
> >  > Ok I'll check on that!
> >  >
> >  > Now I can see that with caching we went from 3-4MB/s to 400KB/s,
> > that
> >  will
> >  > help with the bill.
> >  >
> >  > Last question, any reason why after a while the regular windowed
> >  stream
> >  > starts sending every update instead of caching?
> >  > Could it be because it doesn't have any more memory available? Any
> >  other
> >  > possible reason?
> >  >
> >  > Thank you so much for your help
> >  >
> >  > --
> >  > Alessandro Tagliapietra
> >  >
> >  >
> >  > On Sat, Dec 7, 2019 at 9:14 AM John Roesler 
> >  wrote:
> >  >
> >  > > Ah, yes. Glad you figured it out!
> >  > >
> >  > > Caching does not reduce EOS guarantees at all. I highly
> recommend
> >  using
> >  > > it. You might even want to take a look at the caching metrics to
> >  make sure
> >  > > you have a good hit ratio.
> >  > >
> >  > > -John
> >  > >
> >  > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
> >  > > > Never mind I've found out I can use `.withCachingEnabled` on
> the
> >  store
> >  > > > builder to achieve the same thing as the windowing example as
> >  > > > `Materialized.as` turns that on by default.
> >  > > >
> >  > > > Does caching in any way reduces the EOS guarantees?
> >  > > >
> >  > > > --
> >  > > > Alessandro Tagliapietra
> >  > > >
> >  > > >
> >  > > > On Sat, Dec 7,

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
Hi Sophie,

thanks fo helping.

By eviction of older records you mean they get flushed to the changelog
topic?
Or the cache is just full and so all new records go to the changelog topic
until the old ones are evicted?

Regarding the timing, what timing do you mean? Between when the cache stops
and starts working again? We're using EOS os I believe the commit interval
is every 100ms.

Regards

--
Alessandro Tagliapietra



On Mon, Dec 9, 2019 at 12:15 PM Sophie Blee-Goldman 
wrote:

> It might be that the cache appears to "stop working" because it gets full,
> and each
> new update causes an eviction (of some older record). This would also
> explain the
> opposite behavior, that it "starts working" again after some time without
> being restarted,
> since the cache is completely flushed on commit. Does the timing seem to
> align with your
> commit interval (default is 30s)?
>
> On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > And it seems that for some reason after a while caching works again
> > without a restart of the streams application
> >
> > [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
> >
> > I'll try to enable debug metrics and see if I can find something useful
> > there.
> > Any idea is appreciated in the meantime :)
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> >> It seems that even with caching enabled, after a while the sent bytes
> >> stil go up
> >>
> >> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
> >>
> >> you can see the deploy when I've enabled caching but it looks like it's
> >> still a temporary solution.
> >>
> >> --
> >> Alessandro Tagliapietra
> >>
> >>
> >> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >>> Could be, but since we have a limite amount of input keys (~30),
> >>> windowing generates new keys but old ones are never touched again
> since the
> >>> data per key is in order, I assume it shouldn't be a big deal for it to
> >>> handle 30 keys
> >>> I'll have a look at cache metrics and see if something pops out
> >>>
> >>> Thanks
> >>>
> >>> --
> >>> Alessandro Tagliapietra
> >>>
> >>>
> >>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler 
> >>> wrote:
> >>>
>  Hmm, that’s a good question. Now that we’re talking about caching, I
>  wonder if the cache was just too small. It’s not very big by default.
> 
>  On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
>  > Ok I'll check on that!
>  >
>  > Now I can see that with caching we went from 3-4MB/s to 400KB/s,
> that
>  will
>  > help with the bill.
>  >
>  > Last question, any reason why after a while the regular windowed
>  stream
>  > starts sending every update instead of caching?
>  > Could it be because it doesn't have any more memory available? Any
>  other
>  > possible reason?
>  >
>  > Thank you so much for your help
>  >
>  > --
>  > Alessandro Tagliapietra
>  >
>  >
>  > On Sat, Dec 7, 2019 at 9:14 AM John Roesler 
>  wrote:
>  >
>  > > Ah, yes. Glad you figured it out!
>  > >
>  > > Caching does not reduce EOS guarantees at all. I highly recommend
>  using
>  > > it. You might even want to take a look at the caching metrics to
>  make sure
>  > > you have a good hit ratio.
>  > >
>  > > -John
>  > >
>  > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
>  > > > Never mind I've found out I can use `.withCachingEnabled` on the
>  store
>  > > > builder to achieve the same thing as the windowing example as
>  > > > `Materialized.as` turns that on by default.
>  > > >
>  > > > Does caching in any way reduces the EOS guarantees?
>  > > >
>  > > > --
>  > > > Alessandro Tagliapietra
>  > > >
>  > > >
>  > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
>  > > > tagliapietra.alessan...@gmail.com> wrote:
>  > > >
>  > > > > Seems my journey with this isn't done just yet,
>  > > > >
>  > > > > This seems very complicated to me but I'll try to explain it
> as
>  best I
>  > > can.
>  > > > > To better understand the streams network usage I've used
>  prometheus
>  > > with
>  > > > > the JMX exporter to export kafka metrics.
>  > > > > To check the amount of data we use I'm looking at the
> increments
>  > > > > of kafka_producer_topic_metrics_byte_total and
>  > > > > kafka_producer_producer_topic_metrics_record_send_total,
>  > > > >
>  > > > > Our current (before the change mentioned above) code looks
> like
>  this:
>  > > > >
>  > > > > // This transformers just pairs a value with the previous one
>  storing
>  > > the
>  > > > > temporary one in a store
>  > 

Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Sophie Blee-Goldman
It might be that the cache appears to "stop working" because it gets full,
and each
new update causes an eviction (of some older record). This would also
explain the
opposite behavior, that it "starts working" again after some time without
being restarted,
since the cache is completely flushed on commit. Does the timing seem to
align with your
commit interval (default is 30s)?

On Mon, Dec 9, 2019 at 12:03 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> And it seems that for some reason after a while caching works again
> without a restart of the streams application
>
> [image: Screen Shot 2019-12-08 at 11.59.30 PM.png]
>
> I'll try to enable debug metrics and see if I can find something useful
> there.
> Any idea is appreciated in the meantime :)
>
> --
> Alessandro Tagliapietra
>
> On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> It seems that even with caching enabled, after a while the sent bytes
>> stil go up
>>
>> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>>
>> you can see the deploy when I've enabled caching but it looks like it's
>> still a temporary solution.
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>>> Could be, but since we have a limite amount of input keys (~30),
>>> windowing generates new keys but old ones are never touched again since the
>>> data per key is in order, I assume it shouldn't be a big deal for it to
>>> handle 30 keys
>>> I'll have a look at cache metrics and see if something pops out
>>>
>>> Thanks
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>>
>>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler 
>>> wrote:
>>>
 Hmm, that’s a good question. Now that we’re talking about caching, I
 wonder if the cache was just too small. It’s not very big by default.

 On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
 > Ok I'll check on that!
 >
 > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that
 will
 > help with the bill.
 >
 > Last question, any reason why after a while the regular windowed
 stream
 > starts sending every update instead of caching?
 > Could it be because it doesn't have any more memory available? Any
 other
 > possible reason?
 >
 > Thank you so much for your help
 >
 > --
 > Alessandro Tagliapietra
 >
 >
 > On Sat, Dec 7, 2019 at 9:14 AM John Roesler 
 wrote:
 >
 > > Ah, yes. Glad you figured it out!
 > >
 > > Caching does not reduce EOS guarantees at all. I highly recommend
 using
 > > it. You might even want to take a look at the caching metrics to
 make sure
 > > you have a good hit ratio.
 > >
 > > -John
 > >
 > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
 > > > Never mind I've found out I can use `.withCachingEnabled` on the
 store
 > > > builder to achieve the same thing as the windowing example as
 > > > `Materialized.as` turns that on by default.
 > > >
 > > > Does caching in any way reduces the EOS guarantees?
 > > >
 > > > --
 > > > Alessandro Tagliapietra
 > > >
 > > >
 > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
 > > > tagliapietra.alessan...@gmail.com> wrote:
 > > >
 > > > > Seems my journey with this isn't done just yet,
 > > > >
 > > > > This seems very complicated to me but I'll try to explain it as
 best I
 > > can.
 > > > > To better understand the streams network usage I've used
 prometheus
 > > with
 > > > > the JMX exporter to export kafka metrics.
 > > > > To check the amount of data we use I'm looking at the increments
 > > > > of kafka_producer_topic_metrics_byte_total and
 > > > > kafka_producer_producer_topic_metrics_record_send_total,
 > > > >
 > > > > Our current (before the change mentioned above) code looks like
 this:
 > > > >
 > > > > // This transformers just pairs a value with the previous one
 storing
 > > the
 > > > > temporary one in a store
 > > > > val pairsStream = metricStream
 > > > >   .transformValues(ValueTransformerWithKeySupplier {
 PairTransformer()
 > > },
 > > > > "LastValueStore")
 > > > >   .filter { _, value: MetricSequence? -> value != null }
 > > > >
 > > > > // Create a store to store suppressed windows until a new one is
 > > received
 > > > > val suppressStoreSupplier =
 > > > >
 > >
 Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
 > > > > ..
 > > > >
 > > > > // Window and aggregate data in 1 minute intervals
 > > > > val aggregatedStream = pairsStream
 > > > >   .groupByKey()
 > > > >   .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
 > > > >   .aggre

[VOTE] 2.4.0 RC4

2019-12-09 Thread Manikumar
Hello Kafka users, developers and client-developers,

This is the fifth candidate for release of Apache Kafka 2.4.0.

This release includes many new features, including:
- Allow consumers to fetch from closest replica
- Support for incremental cooperative rebalancing to the consumer rebalance
protocol
- MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication
engine
- New Java authorizer Interface
- Support for non-key joining in KTable
- Administrative API for replica reassignment
- Sticky partitioner
- Return topic metadata and configs in CreateTopics response
- Securing Internal connect REST endpoints
- API to delete consumer offsets and expose it via the AdminClient.

Release notes for the 2.4.0 release:
https://home.apache.org/~manikumar/kafka-2.4.0-rc4/RELEASE_NOTES.html

*** Please download, test and vote by Thursday, December 12, 9am PT

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~manikumar/kafka-2.4.0-rc4/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~manikumar/kafka-2.4.0-rc4/javadoc/

* Tag to be voted upon (off 2.4 branch) is the 2.4.0 tag:
https://github.com/apache/kafka/releases/tag/2.4.0-rc4

* Documentation:
https://kafka.apache.org/24/documentation.html

* Protocol:
https://kafka.apache.org/24/protocol.html

Thanks,
Manikumar


Re: [kafka-clients] [VOTE] 2.4.0 RC3

2019-12-09 Thread Manikumar
Hi all,

We have merged the PR for KAFKA-9212. Thanks to Jason for the fixing the
issue.
Thanks to Yannick for reporting the issue and Michael Jaschob for providing
extra details.

I am canceling this vote and will create new RC shortly.

Thanks,
Manikumar

On Sat, Dec 7, 2019 at 3:19 AM Israel Ekpo  wrote:

>
> Ran the tests in the following environments using Eric's script available
> here:
>
> https://github.com/elalonde/kafka/blob/master/bin/verify-kafka-rc.sh
>
> OS: Ubuntu 18.04.3 LTS
> Java Version: OpenJDK 11.0.4
> Scala Versions: 2.12.10, 12.13.0, 12.13.1
> Gradle Version: 5.6.2
>
> I have made one observation in the release artifacts here:
> https://home.apache.org/~manikumar/kafka-2.4.0-rc3/kafka-2.4.0-src.tgz
>
> The latest available release for Scala is 12.13.1
>
> It looks like the artifacts were built in an environment where the scala
> version in the CLI is set to 2.12.10 so this is the value in
> gradle.properties for the source artifact here:
>
> scalaVersion=2.12.10
>
> Not sure if that needs to change but it seems in the previous releases
> this is usually set to the highest available scala version at the time of
> release.
>
> Also, the test
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker
> failed the first time I ran all the tests but it passed the second and
> third time I ran the group kafka.admin.ReassignPartitionsClusterTest and
> just the single method
> (kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker).
>
>
> I will re-run the tests again in Scala versions 12.13.0, 12.13.1 and share
> my observations later.
>
> So far, it looks good.
>
> isekpo@ossvalidator:~$ lsb_release -a
> No LSB modules are available.
> Distributor ID: Ubuntu
> Description:Ubuntu 18.04.3 LTS
> Release:18.04
> Codename:   bionic
>
> isekpo@ossvalidator:~$ java -version
> openjdk version "11.0.4" 2019-07-16
> OpenJDK Runtime Environment (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
> OpenJDK 64-Bit Server VM (build 11.0.4+11-post-Ubuntu-1ubuntu218.04.3,
> mixed mode, sharing)
>
> isekpo@ossvalidator:~$ scala -version
> Scala code runner version 2.12.10 -- Copyright 2002-2019, LAMP/EPFL and
> Lightbend, Inc.
> isekpo@ossvalidator:~$
>
> isekpo@ossvalidator:~$ gradle -version
>
> 
> Gradle 5.6.2
> 
>
> Build time:   2019-09-05 16:13:54 UTC
> Revision: 55a5e53d855db8fc7b0e494412fc624051a8e781
>
> Kotlin:   1.3.41
> Groovy:   2.5.4
> Ant:  Apache Ant(TM) version 1.9.14 compiled on March 12 2019
> JVM:  11.0.4 (Ubuntu 11.0.4+11-post-Ubuntu-1ubuntu218.04.3)
> OS:   Linux 5.0.0-1027-azure amd64
>
> 1309 tests completed, 1 failed, 17 skipped
>
> > Task :core:integrationTest FAILED
>
> FAILURE: Build failed with an exception.
>
> * What went wrong:
> Execution failed for task ':core:integrationTest'.
> > There were failing tests. See the report at:
> file:///home/isekpo/scratchpad/14891.out/kafka-2.4.0-src/core/build/reports/tests/integrationTest/index.html
>
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or
> --debug option to get more log output. Run with --scan to get full insights.
>
> * Get more help at https://help.gradle.org
>
> Deprecated Gradle features were used in this build, making it incompatible
> with Gradle 6.0.
> Use '--warning-mode all' to show the individual deprecation warnings.
> See
> https://docs.gradle.org/5.6.2/userguide/command_line_interface.html#sec:command_line_warnings
>
> BUILD FAILED in 53m 52s
> 14 actionable tasks: 3 executed, 11 up-to-date
>
>
> Details for failed test:
>
> shouldMoveSinglePartitionWithinBroker
> org.scalatest.exceptions.TestFailedException: Partition should have been
> moved to the expected log directory
> at
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
> at
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
> at
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
> at org.scalatest.Assertions.fail(Assertions.scala:1091)
> at org.scalatest.Assertions.fail$(Assertions.scala:1087)
> at org.scalatest.Assertions$.fail(Assertions.scala:1389)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842)
> at
> kafka.admin.ReassignPartitionsClusterTest.shouldMoveSinglePartitionWithinBroker(ReassignPartitionsClusterTest.scala:177)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> at
> org.junit.internal.runners.model.Ref

MM2 startup delay

2019-12-09 Thread Péter Sinóros-Szabó
Hi,

I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start up
fine, connects to both source and destination, creates new topics...
But it does not start to actually mirror the messages until about 12
minutes after MM2 was started. I would expect it to start mirroring in some
seconds after startup.

Source cluster has about 2800 partitions, destination cluster is empty.
Both clusters are in AWS but in different regions.

What may cause the 12 minutes delay?

Config is:
---
clusters = eucmain, euwbackup
eucmain.bootstrap.servers =
test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092
euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
eucmain->euwbackup.enabled = true
eucmain->euwbackup.topics = .*
eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).*
eucmain->euwbackup.rename.topics = false
replication.policy.separator = __
eucmain.client.id = mm2

I do not see any serious errors in the logs that I would think of a cause
of this.

Thanks,
Peter


Re: Kafka transactions not working in 2.3.1

2019-12-09 Thread Jonathan Santilli
Hello Soman,

again, hard to tell, this is what the docs say:

"...if you are upgrading from a version prior to 0.11.0.x, then
CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
"

also:

"...Once the brokers begin using the latest protocol version, it will no
longer be possible to downgrade the cluster to an older version."

So, I guess you have tested or are testing in a DEV environment before
going to prod, if so, and you have not restarted the brokers with the
newest protocol version, I think it could work.

Again, I have not more info than the one you are providing, this is my leap
of faith.


Cheers!
--
Jonathan





On Mon, Dec 9, 2019 at 12:30 AM Soman Ullah  wrote:

> Hello Jonathan,
> I believe I did not set *log.message.format.version* to the current version
> before I upgraded to 2.3. However, I did set inter.broker.protocol.version
> though. Would reverting back to 0.10.1 and upgrading again with
> *log.message.format.version
> *work?
>
> Thanks,
> Soman
>
> On Sun, Dec 8, 2019 at 1:40 PM Jonathan Santilli <
> jonathansanti...@gmail.com>
> wrote:
>
> > Hard to tell without logs and more context, I mean, Is a big jump from
> > 0.10.1 to 2.3.1.
> > Did you follow all the instructions/comments? as in the upgrade
> > documentation https://kafka.apache.org/documentation/#upgrade_2_3_0?
> >
> > Cheers!
> > --
> > Jonathan
> >
> >
> >
> >
> > On Sun, Dec 8, 2019 at 3:18 PM Soman Ullah  wrote:
> >
> > > Hello Jonathan,
> > > I've installed the same version on all the brokers. The brokers were
> > > restarted as well. Any suggestions on how I can fix this?
> > >
> > > Thanks,
> > > Soman
> > >
> > > On Sat, Dec 7, 2019 at 9:17 AM Jonathan Santilli <
> > > jonathansanti...@gmail.com>
> > > wrote:
> > >
> > > > Hello,
> > > >
> > > > have you ensured you have installed the same version in all brokers?
> > Did
> > > > you restart all brokers after the update as indicated in the rolling
> > > > upgrade instructions?
> > > >
> > > > Cheers!
> > > >
> > > > On Sat, Dec 7, 2019, 2:38 PM Soman Ullah 
> wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I recently upgraded our kafka cluster from Kafka version 0.10.1 to
> > > 2.3.1
> > > > > I've confirmed the version has updated using the following command:
> > > > >
> > > > > /home/wrte/install/kafka/bin/kafka-topics.sh --version
> > > > > 2.3.1.1_212.11 (Commit:8c923fb4c62a38ae)
> > > > >
> > > > > However I'm unable to use the transactional features as the
> producer
> > > > fails
> > > > > with
> > > > >
> > > > > Caused by:
> > org.apache.kafka.common.errors.UnsupportedVersionException:
> > > > > The broker does not support ADD_PARTITIONS_TO_TXN
> > > > >
> > > > > When I double checked the broker api version, it says transactions
> > > aren't
> > > > > supported:
> > > > >
> > > > > /home/wrte/install/kafka/bin/kafka-broker-api-versions.sh
> > > > > --bootstrap-server localhost:4443 | grep -i txn
> > > > > EndTxn(26): UNSUPPORTED,
> > > > > WriteTxnMarkers(27): UNSUPPORTED,
> > > > > TxnOffsetCommit(28): UNSUPPORTED,
> > > > > AddPartitionsToTxn(24): UNSUPPORTED,
> > > > > AddOffsetsToTxn(25): UNSUPPORTED,
> > > > >
> > > > > I can also see that a new topic: __transaction_state was created on
> > the
> > > > > broker. How can I fix this and start using transactions?
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Soman
> > > > >
> > > >
> > >
> >
> >
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan


Re: Reducing streams startup bandwidth usage

2019-12-09 Thread Alessandro Tagliapietra
And it seems that for some reason after a while caching works again without
a restart of the streams application

[image: Screen Shot 2019-12-08 at 11.59.30 PM.png]

I'll try to enable debug metrics and see if I can find something useful
there.
Any idea is appreciated in the meantime :)

--
Alessandro Tagliapietra

On Sun, Dec 8, 2019 at 12:54 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> It seems that even with caching enabled, after a while the sent bytes stil
> go up
>
> [image: Screen Shot 2019-12-08 at 12.52.31 PM.png]
>
> you can see the deploy when I've enabled caching but it looks like it's
> still a temporary solution.
>
> --
> Alessandro Tagliapietra
>
>
> On Sat, Dec 7, 2019 at 10:08 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
>> Could be, but since we have a limite amount of input keys (~30),
>> windowing generates new keys but old ones are never touched again since the
>> data per key is in order, I assume it shouldn't be a big deal for it to
>> handle 30 keys
>> I'll have a look at cache metrics and see if something pops out
>>
>> Thanks
>>
>> --
>> Alessandro Tagliapietra
>>
>>
>> On Sat, Dec 7, 2019 at 10:02 AM John Roesler  wrote:
>>
>>> Hmm, that’s a good question. Now that we’re talking about caching, I
>>> wonder if the cache was just too small. It’s not very big by default.
>>>
>>> On Sat, Dec 7, 2019, at 11:16, Alessandro Tagliapietra wrote:
>>> > Ok I'll check on that!
>>> >
>>> > Now I can see that with caching we went from 3-4MB/s to 400KB/s, that
>>> will
>>> > help with the bill.
>>> >
>>> > Last question, any reason why after a while the regular windowed stream
>>> > starts sending every update instead of caching?
>>> > Could it be because it doesn't have any more memory available? Any
>>> other
>>> > possible reason?
>>> >
>>> > Thank you so much for your help
>>> >
>>> > --
>>> > Alessandro Tagliapietra
>>> >
>>> >
>>> > On Sat, Dec 7, 2019 at 9:14 AM John Roesler 
>>> wrote:
>>> >
>>> > > Ah, yes. Glad you figured it out!
>>> > >
>>> > > Caching does not reduce EOS guarantees at all. I highly recommend
>>> using
>>> > > it. You might even want to take a look at the caching metrics to
>>> make sure
>>> > > you have a good hit ratio.
>>> > >
>>> > > -John
>>> > >
>>> > > On Sat, Dec 7, 2019, at 10:51, Alessandro Tagliapietra wrote:
>>> > > > Never mind I've found out I can use `.withCachingEnabled` on the
>>> store
>>> > > > builder to achieve the same thing as the windowing example as
>>> > > > `Materialized.as` turns that on by default.
>>> > > >
>>> > > > Does caching in any way reduces the EOS guarantees?
>>> > > >
>>> > > > --
>>> > > > Alessandro Tagliapietra
>>> > > >
>>> > > >
>>> > > > On Sat, Dec 7, 2019 at 1:12 AM Alessandro Tagliapietra <
>>> > > > tagliapietra.alessan...@gmail.com> wrote:
>>> > > >
>>> > > > > Seems my journey with this isn't done just yet,
>>> > > > >
>>> > > > > This seems very complicated to me but I'll try to explain it as
>>> best I
>>> > > can.
>>> > > > > To better understand the streams network usage I've used
>>> prometheus
>>> > > with
>>> > > > > the JMX exporter to export kafka metrics.
>>> > > > > To check the amount of data we use I'm looking at the increments
>>> > > > > of kafka_producer_topic_metrics_byte_total and
>>> > > > > kafka_producer_producer_topic_metrics_record_send_total,
>>> > > > >
>>> > > > > Our current (before the change mentioned above) code looks like
>>> this:
>>> > > > >
>>> > > > > // This transformers just pairs a value with the previous one
>>> storing
>>> > > the
>>> > > > > temporary one in a store
>>> > > > > val pairsStream = metricStream
>>> > > > >   .transformValues(ValueTransformerWithKeySupplier {
>>> PairTransformer()
>>> > > },
>>> > > > > "LastValueStore")
>>> > > > >   .filter { _, value: MetricSequence? -> value != null }
>>> > > > >
>>> > > > > // Create a store to store suppressed windows until a new one is
>>> > > received
>>> > > > > val suppressStoreSupplier =
>>> > > > >
>>> > >
>>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("suppress-store"),
>>> > > > > ..
>>> > > > >
>>> > > > > // Window and aggregate data in 1 minute intervals
>>> > > > > val aggregatedStream = pairsStream
>>> > > > >   .groupByKey()
>>> > > > >   .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
>>> > > > >   .aggregate(
>>> > > > >   { MetricSequenceList(ArrayList()) },
>>> > > > >   { key, value, aggregate ->
>>> > > > >   aggregate.getRecords().add(value)
>>> > > > >   aggregate
>>> > > > >   },
>>> > > > >   Materialized.`as`>> > > WindowStore>> > > > >
>>> > >
>>> ByteArray>>("aggregate-store").withKeySerde(Serdes.String()).withValueSerde(Settings.getValueSpecificavroSerde())
>>> > > > >   )
>>> > > > >   .toStream()
>>> > > > >   .flatTransform(TransformerSupplier {
>>> > > > >   // This transformer basically waits until a new window is
>>> > > received
>>> > > > > to emit