Re: session window bug not fixed in 0.10.2.1?

2017-05-14 Thread Guozhang Wang
Hello Ara,

I think KAFKA-5172 could be the cause, if your session windows grow larger
over time with caching turned on.

The fix itself have been committed into trunk and will be included in the
June. release, before that, could you try still turn off caching and see if
this issue does not show up again?


Guozhang

On Mon, May 8, 2017 at 4:18 PM, Ara Ebrahimi 
wrote:

> Well, I can tell you this:
>
> Performance is good initially but then it degrades massively after
> processing a few billion records in a few hours. It becomes quite “choppy”.
> A little bit of data is processed, then absolute silence, then another
> small chunk processed, and so on. I did enable rocksdb logs and there’s no
> write stalling happening.
>
> Note that this is different from what we saw in the previous version. In
> that version turning on caching would slow down 5x the whole thing right
> from the beginning.
>
> So it seems to me KAFKA-5172 can cause such a behavior?
>
> Ara.
>
> On May 5, 2017, at 5:22 PM, Guozhang Wang  gg...@gmail.com>> wrote:
>
> Could KAFKA-5172 cause similar observations?
>
> Guozhang
>
> On Thu, May 4, 2017 at 1:30 AM, Damian Guy > wrote:
>
> It is odd as the person that originally reported the problem has verified
> that it is fixed.
>
> On Thu, 4 May 2017 at 08:31 Guozhang Wang  gg...@gmail.com>> wrote:
>
> Ara,
>
> That is a bit weird, I double checked and agreed with Eno that this
> commit
> is in both trunk and 0.10.2, so I suspect the same issue still persists
> in
> trunk, hence there might be another issue that is not fixed in 2645.
> Could
> you help verify if that is the case? In which we can re-open
> https://issues.apache.org/jira/browse/KAFKA-4851 and investigate
> further.
>
>
> Guozhang
>
>
> On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com>
> wrote:
>
> No errors. But if I enable caching I see performance drop considerably.
> The workaround was to disable caching. The same thing is still true in
> 10.2.1.
>
> Ara.
>
> On May 2, 2017, at 12:55 PM, Eno Thereska >
> wrote:
>
> Hi Ara,
>
> The PR https://github.com/apache/kafka/pull/2645 has gone to both
> trunk
> and
> 0.10.2.1, I just checked. What error are you seeing, could you give
> us
> an
> update?
>
> Thanks
> Eno
>
> On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com>
> wrote:
>
> Hi,
>
> I upgraded to 0.10.2.1 yesterday, enabled caching for session
> windows
> and
> tested again. It doesn’t seem to be fixed?
>
> Ara.
>
> On Mar 27, 2017, at 2:10 PM, Damian Guy >
> wrote:
>
> Hi Ara,
>
> There is a performance issue in the 0.10.2 release of session
> windows.
> It
> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> You can work around this on 0.10.2 by calling the aggregate(..),
> reduce(..)
> etc methods and supplying StateStoreSupplier with
> caching
> disabled, i.e, by doing something like:
>
> final StateStoreSupplier sessionStore =
> Stores.create(*"session-store-name"*)
>  .withKeys(Serdes.String())
>  .withValues(Serdes.String())
>  .persistent()
>  .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>  .build();
>
>
> The fix has also been cherry-picked to the 0.10.2 branch, so you
> could
> build from source and not have to create the StateStoreSupplier.
>
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
> ara.ebrah...@argyledata.com
>
> wrote:
>
> Thanks for the response Mathias!
>
> The reason we want this exact task assignment to happen is that a
> critical
> part of our pipeline involves grouping relevant records together
> (that’s
> what the aggregate function in the topology is for). And for hot
> keys
> this
> can lead to sometimes 100s of records to get grouped together. Even
> worse,
> these records are session bound, we use session windows. Hence we
> see
> lots
> of activity around the store backing the aggregate function and
> even
> though
> we use SSD drives we’re not seeing the kind of performance we want
> to
> see.
> It seems like the aggregate function leads to lots of updates to
> these
> hot
> keys which lead to lots of rocksdb activity.
>
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on
> grouping/aggregating records. Not what we can do with our tight
> schedule
> right now.
> - do grouping/aggregating but employ n instances and rely on
> uniform
> distribution of these tasks. This is the easiest solution and what
> we
> expected to work but didn’t work as you can tell from this thread.
> We
> threw
> 4 instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us
> much,
> aside 

Re: session window bug not fixed in 0.10.2.1?

2017-05-08 Thread Ara Ebrahimi
Well, I can tell you this:

Performance is good initially but then it degrades massively after processing a 
few billion records in a few hours. It becomes quite “choppy”. A little bit of 
data is processed, then absolute silence, then another small chunk processed, 
and so on. I did enable rocksdb logs and there’s no write stalling happening.

Note that this is different from what we saw in the previous version. In that 
version turning on caching would slow down 5x the whole thing right from the 
beginning.

So it seems to me KAFKA-5172 can cause such a behavior?

Ara.

On May 5, 2017, at 5:22 PM, Guozhang Wang 
> wrote:

Could KAFKA-5172 cause similar observations?

Guozhang

On Thu, May 4, 2017 at 1:30 AM, Damian Guy 
> wrote:

It is odd as the person that originally reported the problem has verified
that it is fixed.

On Thu, 4 May 2017 at 08:31 Guozhang Wang 
> wrote:

Ara,

That is a bit weird, I double checked and agreed with Eno that this
commit
is in both trunk and 0.10.2, so I suspect the same issue still persists
in
trunk, hence there might be another issue that is not fixed in 2645.
Could
you help verify if that is the case? In which we can re-open
https://issues.apache.org/jira/browse/KAFKA-4851 and investigate
further.


Guozhang


On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <
ara.ebrah...@argyledata.com>
wrote:

No errors. But if I enable caching I see performance drop considerably.
The workaround was to disable caching. The same thing is still true in
10.2.1.

Ara.

On May 2, 2017, at 12:55 PM, Eno Thereska 
>
wrote:

Hi Ara,

The PR https://github.com/apache/kafka/pull/2645 has gone to both
trunk
and
0.10.2.1, I just checked. What error are you seeing, could you give
us
an
update?

Thanks
Eno

On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
ara.ebrah...@argyledata.com>
wrote:

Hi,

I upgraded to 0.10.2.1 yesterday, enabled caching for session
windows
and
tested again. It doesn’t seem to be fixed?

Ara.

On Mar 27, 2017, at 2:10 PM, Damian Guy 
>
wrote:

Hi Ara,

There is a performance issue in the 0.10.2 release of session
windows.
It
is fixed with this PR: https://github.com/apache/kafka/pull/2645
You can work around this on 0.10.2 by calling the aggregate(..),
reduce(..)
etc methods and supplying StateStoreSupplier with
caching
disabled, i.e, by doing something like:

final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
 .withKeys(Serdes.String())
 .withValues(Serdes.String())
 .persistent()
 .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
 .build();


The fix has also been cherry-picked to the 0.10.2 branch, so you
could
build from source and not have to create the StateStoreSupplier.

Thanks,
Damian

On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
ara.ebrah...@argyledata.com

wrote:

Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a
critical
part of our pipeline involves grouping relevant records together
(that’s
what the aggregate function in the topology is for). And for hot
keys
this
can lead to sometimes 100s of records to get grouped together. Even
worse,
these records are session bound, we use session windows. Hence we
see
lots
of activity around the store backing the aggregate function and
even
though
we use SSD drives we’re not seeing the kind of performance we want
to
see.
It seems like the aggregate function leads to lots of updates to
these
hot
keys which lead to lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on
grouping/aggregating records. Not what we can do with our tight
schedule
right now.
- do grouping/aggregating but employ n instances and rely on
uniform
distribution of these tasks. This is the easiest solution and what
we
expected to work but didn’t work as you can tell from this thread.
We
threw
4 instances at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us
much,
aside from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session
windows
for this aggregate function and apparently there’s no in-memory
session
store impl? I tried to create one but soon realized it’s too much
work
:) I
looked at default PartitionAssigner code too, but that ain’t
trivial
either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <
matth...@confluent.io
> wrote:






This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. 

Re: session window bug not fixed in 0.10.2.1?

2017-05-05 Thread Guozhang Wang
Could KAFKA-5172 cause similar observations?

Guozhang

On Thu, May 4, 2017 at 1:30 AM, Damian Guy  wrote:

> It is odd as the person that originally reported the problem has verified
> that it is fixed.
>
> On Thu, 4 May 2017 at 08:31 Guozhang Wang  wrote:
>
> > Ara,
> >
> > That is a bit weird, I double checked and agreed with Eno that this
> commit
> > is in both trunk and 0.10.2, so I suspect the same issue still persists
> in
> > trunk, hence there might be another issue that is not fixed in 2645.
> Could
> > you help verify if that is the case? In which we can re-open
> > https://issues.apache.org/jira/browse/KAFKA-4851 and investigate
> further.
> >
> >
> > Guozhang
> >
> >
> > On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com>
> > wrote:
> >
> > > No errors. But if I enable caching I see performance drop considerably.
> > > The workaround was to disable caching. The same thing is still true in
> > > 10.2.1.
> > >
> > > Ara.
> > >
> > > > On May 2, 2017, at 12:55 PM, Eno Thereska 
> > > wrote:
> > > >
> > > > Hi Ara,
> > > >
> > > > The PR https://github.com/apache/kafka/pull/2645 has gone to both
> > trunk
> > > and
> > > > 0.10.2.1, I just checked. What error are you seeing, could you give
> us
> > an
> > > > update?
> > > >
> > > > Thanks
> > > > Eno
> > > >
> > > > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> > > ara.ebrah...@argyledata.com>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I upgraded to 0.10.2.1 yesterday, enabled caching for session
> windows
> > > and
> > > >> tested again. It doesn’t seem to be fixed?
> > > >>
> > > >> Ara.
> > > >>
> > > >>> On Mar 27, 2017, at 2:10 PM, Damian Guy 
> > wrote:
> > > >>>
> > > >>> Hi Ara,
> > > >>>
> > > >>> There is a performance issue in the 0.10.2 release of session
> > windows.
> > > It
> > > >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> > > >>> You can work around this on 0.10.2 by calling the aggregate(..),
> > > >> reduce(..)
> > > >>> etc methods and supplying StateStoreSupplier with
> > > caching
> > > >>> disabled, i.e, by doing something like:
> > > >>>
> > > >>> final StateStoreSupplier sessionStore =
> > > >>> Stores.create(*"session-store-name"*)
> > > >>>   .withKeys(Serdes.String())
> > > >>>   .withValues(Serdes.String())
> > > >>>   .persistent()
> > > >>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> > > >>>   .build();
> > > >>>
> > > >>>
> > > >>> The fix has also been cherry-picked to the 0.10.2 branch, so you
> > could
> > > >>> build from source and not have to create the StateStoreSupplier.
> > > >>>
> > > >>> Thanks,
> > > >>> Damian
> > > >>>
> > > >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
> > ara.ebrah...@argyledata.com
> > > >
> > > >>> wrote:
> > > >>>
> > > >>> Thanks for the response Mathias!
> > > >>>
> > > >>> The reason we want this exact task assignment to happen is that a
> > > >> critical
> > > >>> part of our pipeline involves grouping relevant records together
> > > (that’s
> > > >>> what the aggregate function in the topology is for). And for hot
> keys
> > > >> this
> > > >>> can lead to sometimes 100s of records to get grouped together. Even
> > > >> worse,
> > > >>> these records are session bound, we use session windows. Hence we
> see
> > > >> lots
> > > >>> of activity around the store backing the aggregate function and
> even
> > > >> though
> > > >>> we use SSD drives we’re not seeing the kind of performance we want
> to
> > > >> see.
> > > >>> It seems like the aggregate function leads to lots of updates to
> > these
> > > >> hot
> > > >>> keys which lead to lots of rocksdb activity.
> > > >>>
> > > >>> Now there are many ways to fix this problem:
> > > >>> - just don’t aggregate, create an algorithm which is not reliant on
> > > >>> grouping/aggregating records. Not what we can do with our tight
> > > schedule
> > > >>> right now.
> > > >>> - do grouping/aggregating but employ n instances and rely on
> uniform
> > > >>> distribution of these tasks. This is the easiest solution and what
> we
> > > >>> expected to work but didn’t work as you can tell from this thread.
> We
> > > >> threw
> > > >>> 4 instances at it but only 2 got used.
> > > >>> - tune rocksdb? I tried this actually but it didn’t really help us
> > > much,
> > > >>> aside from the fact that tuning rocksdb is very tricky.
> > > >>> - use in-memory store instead? Unfortunately we have to use session
> > > >> windows
> > > >>> for this aggregate function and apparently there’s no in-memory
> > session
> > > >>> store impl? I tried to create one but soon realized it’s too much
> > work
> > > >> :) I
> > > >>> looked at default PartitionAssigner code too, but that ain’t
> trivial
> > > >> either.
> > > >>>
> > > >>> So I’m a bit hopeless :(
> > > >>>
> > > >>> Ara.
> > > >>>
> > > >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <
> matth...@confluent.io
> > > >>  > > >>> 

Re: session window bug not fixed in 0.10.2.1?

2017-05-04 Thread Damian Guy
It is odd as the person that originally reported the problem has verified
that it is fixed.

On Thu, 4 May 2017 at 08:31 Guozhang Wang  wrote:

> Ara,
>
> That is a bit weird, I double checked and agreed with Eno that this commit
> is in both trunk and 0.10.2, so I suspect the same issue still persists in
> trunk, hence there might be another issue that is not fixed in 2645. Could
> you help verify if that is the case? In which we can re-open
> https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further.
>
>
> Guozhang
>
>
> On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi 
> wrote:
>
> > No errors. But if I enable caching I see performance drop considerably.
> > The workaround was to disable caching. The same thing is still true in
> > 10.2.1.
> >
> > Ara.
> >
> > > On May 2, 2017, at 12:55 PM, Eno Thereska 
> > wrote:
> > >
> > > Hi Ara,
> > >
> > > The PR https://github.com/apache/kafka/pull/2645 has gone to both
> trunk
> > and
> > > 0.10.2.1, I just checked. What error are you seeing, could you give us
> an
> > > update?
> > >
> > > Thanks
> > > Eno
> > >
> > > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> > ara.ebrah...@argyledata.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows
> > and
> > >> tested again. It doesn’t seem to be fixed?
> > >>
> > >> Ara.
> > >>
> > >>> On Mar 27, 2017, at 2:10 PM, Damian Guy 
> wrote:
> > >>>
> > >>> Hi Ara,
> > >>>
> > >>> There is a performance issue in the 0.10.2 release of session
> windows.
> > It
> > >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> > >>> You can work around this on 0.10.2 by calling the aggregate(..),
> > >> reduce(..)
> > >>> etc methods and supplying StateStoreSupplier with
> > caching
> > >>> disabled, i.e, by doing something like:
> > >>>
> > >>> final StateStoreSupplier sessionStore =
> > >>> Stores.create(*"session-store-name"*)
> > >>>   .withKeys(Serdes.String())
> > >>>   .withValues(Serdes.String())
> > >>>   .persistent()
> > >>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> > >>>   .build();
> > >>>
> > >>>
> > >>> The fix has also been cherry-picked to the 0.10.2 branch, so you
> could
> > >>> build from source and not have to create the StateStoreSupplier.
> > >>>
> > >>> Thanks,
> > >>> Damian
> > >>>
> > >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
> ara.ebrah...@argyledata.com
> > >
> > >>> wrote:
> > >>>
> > >>> Thanks for the response Mathias!
> > >>>
> > >>> The reason we want this exact task assignment to happen is that a
> > >> critical
> > >>> part of our pipeline involves grouping relevant records together
> > (that’s
> > >>> what the aggregate function in the topology is for). And for hot keys
> > >> this
> > >>> can lead to sometimes 100s of records to get grouped together. Even
> > >> worse,
> > >>> these records are session bound, we use session windows. Hence we see
> > >> lots
> > >>> of activity around the store backing the aggregate function and even
> > >> though
> > >>> we use SSD drives we’re not seeing the kind of performance we want to
> > >> see.
> > >>> It seems like the aggregate function leads to lots of updates to
> these
> > >> hot
> > >>> keys which lead to lots of rocksdb activity.
> > >>>
> > >>> Now there are many ways to fix this problem:
> > >>> - just don’t aggregate, create an algorithm which is not reliant on
> > >>> grouping/aggregating records. Not what we can do with our tight
> > schedule
> > >>> right now.
> > >>> - do grouping/aggregating but employ n instances and rely on uniform
> > >>> distribution of these tasks. This is the easiest solution and what we
> > >>> expected to work but didn’t work as you can tell from this thread. We
> > >> threw
> > >>> 4 instances at it but only 2 got used.
> > >>> - tune rocksdb? I tried this actually but it didn’t really help us
> > much,
> > >>> aside from the fact that tuning rocksdb is very tricky.
> > >>> - use in-memory store instead? Unfortunately we have to use session
> > >> windows
> > >>> for this aggregate function and apparently there’s no in-memory
> session
> > >>> store impl? I tried to create one but soon realized it’s too much
> work
> > >> :) I
> > >>> looked at default PartitionAssigner code too, but that ain’t trivial
> > >> either.
> > >>>
> > >>> So I’m a bit hopeless :(
> > >>>
> > >>> Ara.
> > >>>
> > >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax  > >>  > >>> matth...@confluent.io>> wrote:
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> 
> > >>>
> > >>> This message is for the designated recipient only and may contain
> > >>> privileged, proprietary, or otherwise confidential information. If
> you
> > >> have
> > >>> received it in error, please notify the sender immediately and delete
> > the
> > >>> original. Any other use of the e-mail by you is prohibited. Thank you
> 

Re: session window bug not fixed in 0.10.2.1?

2017-05-04 Thread Guozhang Wang
Ara,

That is a bit weird, I double checked and agreed with Eno that this commit
is in both trunk and 0.10.2, so I suspect the same issue still persists in
trunk, hence there might be another issue that is not fixed in 2645. Could
you help verify if that is the case? In which we can re-open
https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further.


Guozhang


On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi 
wrote:

> No errors. But if I enable caching I see performance drop considerably.
> The workaround was to disable caching. The same thing is still true in
> 10.2.1.
>
> Ara.
>
> > On May 2, 2017, at 12:55 PM, Eno Thereska 
> wrote:
> >
> > Hi Ara,
> >
> > The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk
> and
> > 0.10.2.1, I just checked. What error are you seeing, could you give us an
> > update?
> >
> > Thanks
> > Eno
> >
> > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> ara.ebrah...@argyledata.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows
> and
> >> tested again. It doesn’t seem to be fixed?
> >>
> >> Ara.
> >>
> >>> On Mar 27, 2017, at 2:10 PM, Damian Guy  wrote:
> >>>
> >>> Hi Ara,
> >>>
> >>> There is a performance issue in the 0.10.2 release of session windows.
> It
> >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> >>> You can work around this on 0.10.2 by calling the aggregate(..),
> >> reduce(..)
> >>> etc methods and supplying StateStoreSupplier with
> caching
> >>> disabled, i.e, by doing something like:
> >>>
> >>> final StateStoreSupplier sessionStore =
> >>> Stores.create(*"session-store-name"*)
> >>>   .withKeys(Serdes.String())
> >>>   .withValues(Serdes.String())
> >>>   .persistent()
> >>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> >>>   .build();
> >>>
> >>>
> >>> The fix has also been cherry-picked to the 0.10.2 branch, so you could
> >>> build from source and not have to create the StateStoreSupplier.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi  >
> >>> wrote:
> >>>
> >>> Thanks for the response Mathias!
> >>>
> >>> The reason we want this exact task assignment to happen is that a
> >> critical
> >>> part of our pipeline involves grouping relevant records together
> (that’s
> >>> what the aggregate function in the topology is for). And for hot keys
> >> this
> >>> can lead to sometimes 100s of records to get grouped together. Even
> >> worse,
> >>> these records are session bound, we use session windows. Hence we see
> >> lots
> >>> of activity around the store backing the aggregate function and even
> >> though
> >>> we use SSD drives we’re not seeing the kind of performance we want to
> >> see.
> >>> It seems like the aggregate function leads to lots of updates to these
> >> hot
> >>> keys which lead to lots of rocksdb activity.
> >>>
> >>> Now there are many ways to fix this problem:
> >>> - just don’t aggregate, create an algorithm which is not reliant on
> >>> grouping/aggregating records. Not what we can do with our tight
> schedule
> >>> right now.
> >>> - do grouping/aggregating but employ n instances and rely on uniform
> >>> distribution of these tasks. This is the easiest solution and what we
> >>> expected to work but didn’t work as you can tell from this thread. We
> >> threw
> >>> 4 instances at it but only 2 got used.
> >>> - tune rocksdb? I tried this actually but it didn’t really help us
> much,
> >>> aside from the fact that tuning rocksdb is very tricky.
> >>> - use in-memory store instead? Unfortunately we have to use session
> >> windows
> >>> for this aggregate function and apparently there’s no in-memory session
> >>> store impl? I tried to create one but soon realized it’s too much work
> >> :) I
> >>> looked at default PartitionAssigner code too, but that ain’t trivial
> >> either.
> >>>
> >>> So I’m a bit hopeless :(
> >>>
> >>> Ara.
> >>>
> >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax  >>  >>> matth...@confluent.io>> wrote:
> >>>
> >>>
> >>>
> >>>
> >>> 
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> 
> >>>
> >>> From: "Matthias J. Sax" > matth...@confluent.io
> >
> >>> Subject: Re: more uniform task assignment across kafka stream nodes
> >>> Date: March 27, 2017 at 1:35:30 PM PDT
> >>> To: users@kafka.apache.org
> >>> Reply-To: >
> >>>
> >>>
> >>> Ara,

Re: session window bug not fixed in 0.10.2.1?

2017-05-02 Thread Ara Ebrahimi
No errors. But if I enable caching I see performance drop considerably. The 
workaround was to disable caching. The same thing is still true in 10.2.1.

Ara.

> On May 2, 2017, at 12:55 PM, Eno Thereska  wrote:
>
> Hi Ara,
>
> The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and
> 0.10.2.1, I just checked. What error are you seeing, could you give us an
> update?
>
> Thanks
> Eno
>
> On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi 
> wrote:
>
>> Hi,
>>
>> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and
>> tested again. It doesn’t seem to be fixed?
>>
>> Ara.
>>
>>> On Mar 27, 2017, at 2:10 PM, Damian Guy  wrote:
>>>
>>> Hi Ara,
>>>
>>> There is a performance issue in the 0.10.2 release of session windows. It
>>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
>>> You can work around this on 0.10.2 by calling the aggregate(..),
>> reduce(..)
>>> etc methods and supplying StateStoreSupplier with caching
>>> disabled, i.e, by doing something like:
>>>
>>> final StateStoreSupplier sessionStore =
>>> Stores.create(*"session-store-name"*)
>>>   .withKeys(Serdes.String())
>>>   .withValues(Serdes.String())
>>>   .persistent()
>>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>>>   .build();
>>>
>>>
>>> The fix has also been cherry-picked to the 0.10.2 branch, so you could
>>> build from source and not have to create the StateStoreSupplier.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi 
>>> wrote:
>>>
>>> Thanks for the response Mathias!
>>>
>>> The reason we want this exact task assignment to happen is that a
>> critical
>>> part of our pipeline involves grouping relevant records together (that’s
>>> what the aggregate function in the topology is for). And for hot keys
>> this
>>> can lead to sometimes 100s of records to get grouped together. Even
>> worse,
>>> these records are session bound, we use session windows. Hence we see
>> lots
>>> of activity around the store backing the aggregate function and even
>> though
>>> we use SSD drives we’re not seeing the kind of performance we want to
>> see.
>>> It seems like the aggregate function leads to lots of updates to these
>> hot
>>> keys which lead to lots of rocksdb activity.
>>>
>>> Now there are many ways to fix this problem:
>>> - just don’t aggregate, create an algorithm which is not reliant on
>>> grouping/aggregating records. Not what we can do with our tight schedule
>>> right now.
>>> - do grouping/aggregating but employ n instances and rely on uniform
>>> distribution of these tasks. This is the easiest solution and what we
>>> expected to work but didn’t work as you can tell from this thread. We
>> threw
>>> 4 instances at it but only 2 got used.
>>> - tune rocksdb? I tried this actually but it didn’t really help us much,
>>> aside from the fact that tuning rocksdb is very tricky.
>>> - use in-memory store instead? Unfortunately we have to use session
>> windows
>>> for this aggregate function and apparently there’s no in-memory session
>>> store impl? I tried to create one but soon realized it’s too much work
>> :) I
>>> looked at default PartitionAssigner code too, but that ain’t trivial
>> either.
>>>
>>> So I’m a bit hopeless :(
>>>
>>> Ara.
>>>
>>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax > >> matth...@confluent.io>> wrote:
>>>
>>>
>>>
>>>
>>> 
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you
>> have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Thank you in
>>> advance for your cooperation.
>>>
>>> 
>>>
>>> From: "Matthias J. Sax"  matth...@confluent.io
>
>>> Subject: Re: more uniform task assignment across kafka stream nodes
>>> Date: March 27, 2017 at 1:35:30 PM PDT
>>> To: users@kafka.apache.org
>>> Reply-To: >
>>>
>>>
>>> Ara,
>>>
>>> thanks for the detailed information.
>>>
>>> If I parse this correctly, both instances run the same number of tasks
>>> (12 each). That is all Streams promises.
>>>
>>> To come back to your initial question:
>>>
>>> Is there a way to tell kafka streams to uniformly assign partitions
>> across
>>> instances? If I have n kafka streams instances running, I want each to
>>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
>>> logic. Just dumb 1/n assignment.
>>>
>>> That is exactly what you get: each of you two instances get 24/2 = 12
>>> tasks assigned. That is dump 1/n assignment, isn't it? So my original
>>> response was correct.
>>>
>>> However, I now understand better what you are actually meaning 

Re: session window bug not fixed in 0.10.2.1?

2017-05-02 Thread Eno Thereska
Hi Ara,

The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and
0.10.2.1, I just checked. What error are you seeing, could you give us an
update?

Thanks
Eno

On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi 
wrote:

> Hi,
>
> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and
> tested again. It doesn’t seem to be fixed?
>
> Ara.
>
> > On Mar 27, 2017, at 2:10 PM, Damian Guy  wrote:
> >
> > Hi Ara,
> >
> > There is a performance issue in the 0.10.2 release of session windows. It
> > is fixed with this PR: https://github.com/apache/kafka/pull/2645
> > You can work around this on 0.10.2 by calling the aggregate(..),
> reduce(..)
> > etc methods and supplying StateStoreSupplier with caching
> > disabled, i.e, by doing something like:
> >
> > final StateStoreSupplier sessionStore =
> > Stores.create(*"session-store-name"*)
> >.withKeys(Serdes.String())
> >.withValues(Serdes.String())
> >.persistent()
> >.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> >.build();
> >
> >
> > The fix has also been cherry-picked to the 0.10.2 branch, so you could
> > build from source and not have to create the StateStoreSupplier.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi 
> > wrote:
> >
> > Thanks for the response Mathias!
> >
> > The reason we want this exact task assignment to happen is that a
> critical
> > part of our pipeline involves grouping relevant records together (that’s
> > what the aggregate function in the topology is for). And for hot keys
> this
> > can lead to sometimes 100s of records to get grouped together. Even
> worse,
> > these records are session bound, we use session windows. Hence we see
> lots
> > of activity around the store backing the aggregate function and even
> though
> > we use SSD drives we’re not seeing the kind of performance we want to
> see.
> > It seems like the aggregate function leads to lots of updates to these
> hot
> > keys which lead to lots of rocksdb activity.
> >
> > Now there are many ways to fix this problem:
> > - just don’t aggregate, create an algorithm which is not reliant on
> > grouping/aggregating records. Not what we can do with our tight schedule
> > right now.
> > - do grouping/aggregating but employ n instances and rely on uniform
> > distribution of these tasks. This is the easiest solution and what we
> > expected to work but didn’t work as you can tell from this thread. We
> threw
> > 4 instances at it but only 2 got used.
> > - tune rocksdb? I tried this actually but it didn’t really help us much,
> > aside from the fact that tuning rocksdb is very tricky.
> > - use in-memory store instead? Unfortunately we have to use session
> windows
> > for this aggregate function and apparently there’s no in-memory session
> > store impl? I tried to create one but soon realized it’s too much work
> :) I
> > looked at default PartitionAssigner code too, but that ain’t trivial
> either.
> >
> > So I’m a bit hopeless :(
> >
> > Ara.
> >
> > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax   > matth...@confluent.io>> wrote:
> >
> >
> >
> >
> > 
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > 
> >
> > From: "Matthias J. Sax"  >>>
> > Subject: Re: more uniform task assignment across kafka stream nodes
> > Date: March 27, 2017 at 1:35:30 PM PDT
> > To: users@kafka.apache.org
> > Reply-To: >
> >
> >
> > Ara,
> >
> > thanks for the detailed information.
> >
> > If I parse this correctly, both instances run the same number of tasks
> > (12 each). That is all Streams promises.
> >
> > To come back to your initial question:
> >
> > Is there a way to tell kafka streams to uniformly assign partitions
> across
> > instances? If I have n kafka streams instances running, I want each to
> > handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> > logic. Just dumb 1/n assignment.
> >
> > That is exactly what you get: each of you two instances get 24/2 = 12
> > tasks assigned. That is dump 1/n assignment, isn't it? So my original
> > response was correct.
> >
> > However, I now understand better what you are actually meaning by your
> > question. Note that Streams does not distinguish "type" of tasks -- it
> > only sees 24 tasks and assigns those in a balanced way.
> >
> > Thus, currently there is no easy way to get the assignment you want to
> > have, except, you implement you own