kaka-streams 0.11.0.1 rocksdb bug?

2017-09-22 Thread Ara Ebrahimi
Hi,

We just upgraded to kaka-streams 0.11.0.1 and noticed that in the cluster 
deployment reduce() never gets called. Funny thing is it does gets called in 
the unit tests. And no, it’s not a data issue. What I have noticed is that all 
rocked folders (//1_0// and so on) are 
empty. I do not see any file permission errors or any errors in general upon 
startup of streaming application.

Has anyone else run into this issue?

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: session window bug not fixed in 0.10.2.1?

2017-05-08 Thread Ara Ebrahimi
Well, I can tell you this:

Performance is good initially but then it degrades massively after processing a 
few billion records in a few hours. It becomes quite “choppy”. A little bit of 
data is processed, then absolute silence, then another small chunk processed, 
and so on. I did enable rocksdb logs and there’s no write stalling happening.

Note that this is different from what we saw in the previous version. In that 
version turning on caching would slow down 5x the whole thing right from the 
beginning.

So it seems to me KAFKA-5172 can cause such a behavior?

Ara.

On May 5, 2017, at 5:22 PM, Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote:

Could KAFKA-5172 cause similar observations?

Guozhang

On Thu, May 4, 2017 at 1:30 AM, Damian Guy 
<damian@gmail.com<mailto:damian@gmail.com>> wrote:

It is odd as the person that originally reported the problem has verified
that it is fixed.

On Thu, 4 May 2017 at 08:31 Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote:

Ara,

That is a bit weird, I double checked and agreed with Eno that this
commit
is in both trunk and 0.10.2, so I suspect the same issue still persists
in
trunk, hence there might be another issue that is not fixed in 2645.
Could
you help verify if that is the case? In which we can re-open
https://issues.apache.org/jira/browse/KAFKA-4851 and investigate
further.


Guozhang


On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <
ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

No errors. But if I enable caching I see performance drop considerably.
The workaround was to disable caching. The same thing is still true in
10.2.1.

Ara.

On May 2, 2017, at 12:55 PM, Eno Thereska 
<eno.there...@gmail.com<mailto:eno.there...@gmail.com>>
wrote:

Hi Ara,

The PR https://github.com/apache/kafka/pull/2645 has gone to both
trunk
and
0.10.2.1, I just checked. What error are you seeing, could you give
us
an
update?

Thanks
Eno

On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

Hi,

I upgraded to 0.10.2.1 yesterday, enabled caching for session
windows
and
tested again. It doesn’t seem to be fixed?

Ara.

On Mar 27, 2017, at 2:10 PM, Damian Guy 
<damian@gmail.com<mailto:damian@gmail.com>>
wrote:

Hi Ara,

There is a performance issue in the 0.10.2 release of session
windows.
It
is fixed with this PR: https://github.com/apache/kafka/pull/2645
You can work around this on 0.10.2 by calling the aggregate(..),
reduce(..)
etc methods and supplying StateStoreSupplier with
caching
disabled, i.e, by doing something like:

final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
 .withKeys(Serdes.String())
 .withValues(Serdes.String())
 .persistent()
 .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
 .build();


The fix has also been cherry-picked to the 0.10.2 branch, so you
could
build from source and not have to create the StateStoreSupplier.

Thanks,
Damian

On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <
ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>

wrote:

Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a
critical
part of our pipeline involves grouping relevant records together
(that’s
what the aggregate function in the topology is for). And for hot
keys
this
can lead to sometimes 100s of records to get grouped together. Even
worse,
these records are session bound, we use session windows. Hence we
see
lots
of activity around the store backing the aggregate function and
even
though
we use SSD drives we’re not seeing the kind of performance we want
to
see.
It seems like the aggregate function leads to lots of updates to
these
hot
keys which lead to lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on
grouping/aggregating records. Not what we can do with our tight
schedule
right now.
- do grouping/aggregating but employ n instances and rely on
uniform
distribution of these tasks. This is the easiest solution and what
we
expected to work but didn’t work as you can tell from this thread.
We
threw
4 instances at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us
much,
aside from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session
windows
for this aggregate function and apparently there’s no in-memory
session
store impl? I tried to create one but soon realized it’s too much
work
:) I
looked at default PartitionAssigner code too, but that ain’t
trivial
either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <
matth...@confluent.io<mailto:matth...@confluent.io>
> wrote:






This message 

Re: session window bug not fixed in 0.10.2.1?

2017-05-02 Thread Ara Ebrahimi
No errors. But if I enable caching I see performance drop considerably. The 
workaround was to disable caching. The same thing is still true in 10.2.1.

Ara.

> On May 2, 2017, at 12:55 PM, Eno Thereska <eno.there...@gmail.com> wrote:
>
> Hi Ara,
>
> The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and
> 0.10.2.1, I just checked. What error are you seeing, could you give us an
> update?
>
> Thanks
> Eno
>
> On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and
>> tested again. It doesn’t seem to be fixed?
>>
>> Ara.
>>
>>> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote:
>>>
>>> Hi Ara,
>>>
>>> There is a performance issue in the 0.10.2 release of session windows. It
>>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
>>> You can work around this on 0.10.2 by calling the aggregate(..),
>> reduce(..)
>>> etc methods and supplying StateStoreSupplier with caching
>>> disabled, i.e, by doing something like:
>>>
>>> final StateStoreSupplier sessionStore =
>>> Stores.create(*"session-store-name"*)
>>>   .withKeys(Serdes.String())
>>>   .withValues(Serdes.String())
>>>   .persistent()
>>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>>>   .build();
>>>
>>>
>>> The fix has also been cherry-picked to the 0.10.2 branch, so you could
>>> build from source and not have to create the StateStoreSupplier.
>>>
>>> Thanks,
>>> Damian
>>>
>>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com>
>>> wrote:
>>>
>>> Thanks for the response Mathias!
>>>
>>> The reason we want this exact task assignment to happen is that a
>> critical
>>> part of our pipeline involves grouping relevant records together (that’s
>>> what the aggregate function in the topology is for). And for hot keys
>> this
>>> can lead to sometimes 100s of records to get grouped together. Even
>> worse,
>>> these records are session bound, we use session windows. Hence we see
>> lots
>>> of activity around the store backing the aggregate function and even
>> though
>>> we use SSD drives we’re not seeing the kind of performance we want to
>> see.
>>> It seems like the aggregate function leads to lots of updates to these
>> hot
>>> keys which lead to lots of rocksdb activity.
>>>
>>> Now there are many ways to fix this problem:
>>> - just don’t aggregate, create an algorithm which is not reliant on
>>> grouping/aggregating records. Not what we can do with our tight schedule
>>> right now.
>>> - do grouping/aggregating but employ n instances and rely on uniform
>>> distribution of these tasks. This is the easiest solution and what we
>>> expected to work but didn’t work as you can tell from this thread. We
>> threw
>>> 4 instances at it but only 2 got used.
>>> - tune rocksdb? I tried this actually but it didn’t really help us much,
>>> aside from the fact that tuning rocksdb is very tricky.
>>> - use in-memory store instead? Unfortunately we have to use session
>> windows
>>> for this aggregate function and apparently there’s no in-memory session
>>> store impl? I tried to create one but soon realized it’s too much work
>> :) I
>>> looked at default PartitionAssigner code too, but that ain’t trivial
>> either.
>>>
>>> So I’m a bit hopeless :(
>>>
>>> Ara.
>>>
>>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io
>> >> matth...@confluent.io>> wrote:
>>>
>>>
>>>
>>>
>>> 
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you
>> have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Thank you in
>>> advance for your cooperation.
>>>
>>> 
>>>
>>> From: "Matthias J. Sax" <matth...@confluent.io> matth...@confluent.io
>>>>>
>>> Subject: Re: more uniform task assignment across kafka stream no

session window bug not fixed in 0.10.2.1?

2017-04-28 Thread Ara Ebrahimi
Hi,

I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and 
tested again. It doesn’t seem to be fixed?

Ara.

> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote:
>
> Hi Ara,
>
> There is a performance issue in the 0.10.2 release of session windows. It
> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> You can work around this on 0.10.2 by calling the aggregate(..), reduce(..)
> etc methods and supplying StateStoreSupplier with caching
> disabled, i.e, by doing something like:
>
> final StateStoreSupplier sessionStore =
> Stores.create(*"session-store-name"*)
>.withKeys(Serdes.String())
>.withValues(Serdes.String())
>.persistent()
>.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>.build();
>
>
> The fix has also been cherry-picked to the 0.10.2 branch, so you could
> build from source and not have to create the StateStoreSupplier.
>
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
> Thanks for the response Mathias!
>
> The reason we want this exact task assignment to happen is that a critical
> part of our pipeline involves grouping relevant records together (that’s
> what the aggregate function in the topology is for). And for hot keys this
> can lead to sometimes 100s of records to get grouped together. Even worse,
> these records are session bound, we use session windows. Hence we see lots
> of activity around the store backing the aggregate function and even though
> we use SSD drives we’re not seeing the kind of performance we want to see.
> It seems like the aggregate function leads to lots of updates to these hot
> keys which lead to lots of rocksdb activity.
>
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on
> grouping/aggregating records. Not what we can do with our tight schedule
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform
> distribution of these tasks. This is the easiest solution and what we
> expected to work but didn’t work as you can tell from this thread. We threw
> 4 instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much,
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows
> for this aggregate function and apparently there’s no in-memory session
> store impl? I tried to create one but soon realized it’s too much work :) I
> looked at default PartitionAssigner code too, but that ain’t trivial either.
>
> So I’m a bit hopeless :(
>
> Ara.
>
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io matth...@confluent.io>> wrote:
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
> From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
>>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
>
>
> Ara,
>
> thanks for the detailed information.
>
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
>
> To come back to your initial question:
>
> Is there a way to tell kafka streams to uniformly assign partitions across
> instances? If I have n kafka streams instances running, I want each to
> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> logic. Just dumb 1/n assignment.
>
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
>
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
>
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
>
> This is the current implementation for 0.10.2
> https://github.com/apache/kafka/blob/

Re: more uniform task assignment across kafka stream nodes

2017-03-28 Thread Ara Ebrahimi
Awesome! Thanks. Can’t wait to try it out.

Ara.

On Mar 28, 2017, at 2:13 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 28, 2017 at 2:13:51 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Created a JIRA: https://issues.apache.org/jira/browse/KAFKA-4969


-Matthias

On 3/27/17 4:33 PM, Ara Ebrahimi wrote:
Well, even with 4-5x better performance thanks to the session window fix, I 
expect to get ~10x better performance if I throw 10x more nodes at the problem. 
That won’t be the case due to task assignment unfortunately. I may end up with 
say 5-6 nodes with aggregation assigned to them and 4-5 nodes sitting there 
doing nothing. So it is a problem.

Ara.

On Mar 27, 2017, at 4:15 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 4:15:07 PM PDT
To: 
users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: 
<users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>


Great!

So overall, the issue is not related to task assignment. Also the
description below, does not indicate that different task assignment
would change anything.


-Matthias

On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
Let me clarify, cause I think we’re using different terminologies:

- message key is phone number, reversed
- all call records for a phone number land on the same partition
- then we apply a session window on them and aggregate+reduce
- so we end up with a group of records for a phone number. This group is 
literally an avro object with an array of records in it for the session.
- since records arrive chronologically and since we have a session window, then 
all call records for a phone number end up in the same partition and session 
(intended behavior). We can easily have many such phone call record groups with 
100s of call records in them. The aggregate object (the avro object with array 
of records in it) can get updated 100s of times for the same phone number in 
the course of an hour or so.
- we process billions of such call records a day
- we can’t expect our clients to install massive clusters of 10s or 100s of 
nodes. We do need to make sure this processing is as efficient as possible. The 
session window bug was killing us. Much better with the fix Damian provided!

Ara.

On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 2:41:06 PM PDT
To: 
users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: 
<users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>


Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple o

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Well, even with 4-5x better performance thanks to the session window fix, I 
expect to get ~10x better performance if I throw 10x more nodes at the problem. 
That won’t be the case due to task assignment unfortunately. I may end up with 
say 5-6 nodes with aggregation assigned to them and 4-5 nodes sitting there 
doing nothing. So it is a problem.

Ara.

On Mar 27, 2017, at 4:15 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 4:15:07 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Great!

So overall, the issue is not related to task assignment. Also the
description below, does not indicate that different task assignment
would change anything.


-Matthias

On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
Let me clarify, cause I think we’re using different terminologies:

- message key is phone number, reversed
- all call records for a phone number land on the same partition
- then we apply a session window on them and aggregate+reduce
- so we end up with a group of records for a phone number. This group is 
literally an avro object with an array of records in it for the session.
- since records arrive chronologically and since we have a session window, then 
all call records for a phone number end up in the same partition and session 
(intended behavior). We can easily have many such phone call record groups with 
100s of call records in them. The aggregate object (the avro object with array 
of records in it) can get updated 100s of times for the same phone number in 
the course of an hour or so.
- we process billions of such call records a day
- we can’t expect our clients to install massive clusters of 10s or 100s of 
nodes. We do need to make sure this processing is as efficient as possible. The 
session window bug was killing us. Much better with the fix Damian provided!

Ara.

On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 2:41:06 PM PDT
To: 
users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: 
<users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>


Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple of follow up comments:

critical part of our pipeline involves grouping relevant records together

Can you explain this a little better? The abstraction of a task does
group data together already. Furthermore, if you get multiple tasks,
those are independent units with regard to grouping as consecutive tasks
are "connected" via repartitioning steps. Thus, even if we apply the
task assignment as you ask for, I am not sure if this would change
anything? Maybe you can give a small data example what
behavior/data-co-location you need and what Streams provides for you.

And for hot keys this can lead to sometimes 100s of records to get grouped 
together

This is independent of task placement -- it a partitioning issue. If you
want to work on that, you can provide a custom `Partioner` for the used
Kafka producers (note, your external Producer writing to your Streams
input topic might already generate "hot" keys, so you might need to use
a custom partitioner there, too)

Also "100s of records" does not sound much to me. Streams can process
multiple hundredths of thousandth records per thread. That is the
reason, why I think that the fix Damian pointed out will most likely fix
your problem.



-Matthias


On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
Thanks for the response Mathias!

The reason

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Let me clarify, cause I think we’re using different terminologies:

- message key is phone number, reversed
- all call records for a phone number land on the same partition
- then we apply a session window on them and aggregate+reduce
- so we end up with a group of records for a phone number. This group is 
literally an avro object with an array of records in it for the session.
- since records arrive chronologically and since we have a session window, then 
all call records for a phone number end up in the same partition and session 
(intended behavior). We can easily have many such phone call record groups with 
100s of call records in them. The aggregate object (the avro object with array 
of records in it) can get updated 100s of times for the same phone number in 
the course of an hour or so.
- we process billions of such call records a day
- we can’t expect our clients to install massive clusters of 10s or 100s of 
nodes. We do need to make sure this processing is as efficient as possible. The 
session window bug was killing us. Much better with the fix Damian provided!

Ara.

On Mar 27, 2017, at 2:41 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 2:41:06 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Ara,

I assume your performance issue is most likely related to the fix Damian
pointed out already.

Couple of follow up comments:

critical part of our pipeline involves grouping relevant records together

Can you explain this a little better? The abstraction of a task does
group data together already. Furthermore, if you get multiple tasks,
those are independent units with regard to grouping as consecutive tasks
are "connected" via repartitioning steps. Thus, even if we apply the
task assignment as you ask for, I am not sure if this would change
anything? Maybe you can give a small data example what
behavior/data-co-location you need and what Streams provides for you.

And for hot keys this can lead to sometimes 100s of records to get grouped 
together

This is independent of task placement -- it a partitioning issue. If you
want to work on that, you can provide a custom `Partioner` for the used
Kafka producers (note, your external Producer writing to your Streams
input topic might already generate "hot" keys, so you might need to use
a custom partitioner there, too)

Also "100s of records" does not sound much to me. Streams can process
multiple hundredths of thousandth records per thread. That is the
reason, why I think that the fix Damian pointed out will most likely fix
your problem.



-Matthias


On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical part 
of our pipeline involves grouping relevant records together (that’s what the 
aggregate function in the topology is for). And for hot keys this can lead to 
sometimes 100s of records to get grouped together. Even worse, these records 
are session bound, we use session windows. Hence we see lots of activity around 
the store backing the aggregate function and even though we use SSD drives 
we’re not seeing the kind of performance we want to see. It seems like the 
aggregate function leads to lots of updates to these hot keys which lead to 
lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on 
grouping/aggregating records. Not what we can do with our tight schedule right 
now.
- do grouping/aggregating but employ n instances and rely on uniform 
distribution of these tasks. This is the easiest solution and what we expected 
to work but didn’t work as you can tell from this thread. We threw 4 instances 
at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much, aside 
from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows for 
this aggregate function and apparently there’s no in-memory session store impl? 
I tried to create one but soon realized it’s too much work :) I looked at 
default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Holy s...!

This lead to ~4-5x better performance. I’m gonna try this with more nodes and 
if performance improves almost linearly then we are good for now.

Thanks!
Ara.

> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote:
>
> Hi Ara,
>
> There is a performance issue in the 0.10.2 release of session windows. It
> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> You can work around this on 0.10.2 by calling the aggregate(..), reduce(..)
> etc methods and supplying StateStoreSupplier with caching
> disabled, i.e, by doing something like:
>
> final StateStoreSupplier sessionStore =
> Stores.create(*"session-store-name"*)
>.withKeys(Serdes.String())
>.withValues(Serdes.String())
>.persistent()
>.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
>.build();
>
>
> The fix has also been cherry-picked to the 0.10.2 branch, so you could
> build from source and not have to create the StateStoreSupplier.
>
> Thanks,
> Damian
>
> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
> Thanks for the response Mathias!
>
> The reason we want this exact task assignment to happen is that a critical
> part of our pipeline involves grouping relevant records together (that’s
> what the aggregate function in the topology is for). And for hot keys this
> can lead to sometimes 100s of records to get grouped together. Even worse,
> these records are session bound, we use session windows. Hence we see lots
> of activity around the store backing the aggregate function and even though
> we use SSD drives we’re not seeing the kind of performance we want to see.
> It seems like the aggregate function leads to lots of updates to these hot
> keys which lead to lots of rocksdb activity.
>
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on
> grouping/aggregating records. Not what we can do with our tight schedule
> right now.
> - do grouping/aggregating but employ n instances and rely on uniform
> distribution of these tasks. This is the easiest solution and what we
> expected to work but didn’t work as you can tell from this thread. We threw
> 4 instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much,
> aside from the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows
> for this aggregate function and apparently there’s no in-memory session
> store impl? I tried to create one but soon realized it’s too much work :) I
> looked at default PartitionAssigner code too, but that ain’t trivial either.
>
> So I’m a bit hopeless :(
>
> Ara.
>
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io matth...@confluent.io>> wrote:
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> 
>
> From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io
>>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
>
>
> Ara,
>
> thanks for the detailed information.
>
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
>
> To come back to your initial question:
>
> Is there a way to tell kafka streams to uniformly assign partitions across
> instances? If I have n kafka streams instances running, I want each to
> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> logic. Just dumb 1/n assignment.
>
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
>
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
>
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
>
> This is the current implementation for 0.10.2
>

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
Thanks for the response Mathias!

The reason we want this exact task assignment to happen is that a critical part 
of our pipeline involves grouping relevant records together (that’s what the 
aggregate function in the topology is for). And for hot keys this can lead to 
sometimes 100s of records to get grouped together. Even worse, these records 
are session bound, we use session windows. Hence we see lots of activity around 
the store backing the aggregate function and even though we use SSD drives 
we’re not seeing the kind of performance we want to see. It seems like the 
aggregate function leads to lots of updates to these hot keys which lead to 
lots of rocksdb activity.

Now there are many ways to fix this problem:
- just don’t aggregate, create an algorithm which is not reliant on 
grouping/aggregating records. Not what we can do with our tight schedule right 
now.
- do grouping/aggregating but employ n instances and rely on uniform 
distribution of these tasks. This is the easiest solution and what we expected 
to work but didn’t work as you can tell from this thread. We threw 4 instances 
at it but only 2 got used.
- tune rocksdb? I tried this actually but it didn’t really help us much, aside 
from the fact that tuning rocksdb is very tricky.
- use in-memory store instead? Unfortunately we have to use session windows for 
this aggregate function and apparently there’s no in-memory session store impl? 
I tried to create one but soon realized it’s too much work :) I looked at 
default PartitionAssigner code too, but that ain’t trivial either.

So I’m a bit hopeless :(

Ara.

On Mar 27, 2017, at 1:35 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 27, 2017 at 1:35:30 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Ara,

thanks for the detailed information.

If I parse this correctly, both instances run the same number of tasks
(12 each). That is all Streams promises.

To come back to your initial question:

Is there a way to tell kafka streams to uniformly assign partitions across 
instances? If I have n kafka streams instances running, I want each to handle 
EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 
1/n assignment.

That is exactly what you get: each of you two instances get 24/2 = 12
tasks assigned. That is dump 1/n assignment, isn't it? So my original
response was correct.

However, I now understand better what you are actually meaning by your
question. Note that Streams does not distinguish "type" of tasks -- it
only sees 24 tasks and assigns those in a balanced way.

Thus, currently there is no easy way to get the assignment you want to
have, except, you implement you own `PartitionAssignor`.

This is the current implementation for 0.10.2
https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java

You can, if you wish write your own assignor and set it via
StreamsConfig. However, be aware that this might be tricky to get right
and also might have runtime implications with regard to rebalancing and
state store recovery. We recently improve the current implementation to
avoid costly task movements:
https://issues.apache.org/jira/browse/KAFKA-4677

Thus, I would not recommend to implement an own `PartitionAssignor`.


However, the root question is, why do you need this exact assignment you
are looking for in the first place? Why is it "bad" if "types" of tasks
are not distinguished? I would like to understand your requirement
better -- it might be worth to improve Streams here.


-Matthias


On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
Hi,

So, I simplified the topology by making sure we have only 1 source topic. Now I 
have 1 source topic, 8 partitions, 2 instances. And here’s how the topology 
looks like:

instance 1:

KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-1
Active tasks:
StreamsTask taskId: 0_3
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children: [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children: [KS

Re: more uniform task assignment across kafka stream nodes

2017-03-27 Thread Ara Ebrahimi
:

StreamsThread appId: mar-23-modular
StreamsThread clientId: mar-23-modular
StreamsThread threadId: StreamThread-8
Active tasks:
StreamsTask taskId: 0_5
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [activities-avro-or]
children: [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children: [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children: [KSTREAM-BRANCH-03]
KSTREAM-BRANCH-03:
children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05]
KSTREAM-BRANCHCHILD-04:
children: [KSTREAM-MAPVALUES-06]
KSTREAM-MAPVALUES-06:
children: [KSTREAM-FLATMAPVALUES-07]
KSTREAM-FLATMAPVALUES-07:
children: [KSTREAM-MAP-08]
KSTREAM-MAP-08:
children: [KSTREAM-FILTER-11]
KSTREAM-FILTER-11:
children: [KSTREAM-SINK-10]
KSTREAM-SINK-10:
topic: activities-by-phone-store-or-repartition
KSTREAM-BRANCHCHILD-05:
Partitions [activities-avro-or-5]
Standby tasks:


activities-avro-or is input topic. ml-features-avro-or is output topic. In the 
middle we have an aggregate (activities-by-phone-store-or-repartition).

On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see 5. 
Bad.

On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. Good.

On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And 3 
on instance 2. Bad.

As I said in terms of offsets for all these partitions I see uniform 
distribution, so we’re not dealing with a bad key scenario.

Ara.

On Mar 25, 2017, at 6:43 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 6:43:12 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Please share the rest of your topology code (without any UDFs / business
logic). Otherwise, I cannot give further advice.

-Matthias


On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
Via:

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3”);

These are different kinds of topics consuming different avro objects.

Ara.

On Mar 25, 2017, at 6:04 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 6:04:30 PM PDT
To: 
users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: 
<users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>


Ara,

How do you consume your topics? Via

builder.stream("topic1", "topic2", "topic3);

or via

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3");

Both and handled differently with regard to creating tasks (partition to
task assignment also depends on you downstream code though).

If this does not help, can you maybe share the structure of processing?
To dig deeper, we would need to know the topology DAG.


-Matthias


On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
Mathias,

This apparently happens because we have more than 1 source topic. We have 3 
source topics in the same application. So it seems like the task assignment 
algorithm creates topologies not for one specific topic at a time but the total 
partitions across all source topics consumed in an application instance. 
Because we have some code dependencies between these 3 source topics we can’t 
separate them into 3 applications at this time. Hence the reason I want to get 
the task assignment algorithm basically do a uniform and simple task assignment 
PER source topic.

Ara.

On Mar 25, 2017, at 5:21 PM, Matthias J. Sax 
<matth...@confluent.io<

Re: more uniform task assignment across kafka stream nodes

2017-03-25 Thread Ara Ebrahimi
Via:

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3”);

These are different kinds of topics consuming different avro objects.

Ara.

On Mar 25, 2017, at 6:04 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 6:04:30 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Ara,

How do you consume your topics? Via

builder.stream("topic1", "topic2", "topic3);

or via

builder.stream("topic1");
builder.stream("topic2");
builder.stream("topic3");

Both and handled differently with regard to creating tasks (partition to
task assignment also depends on you downstream code though).

If this does not help, can you maybe share the structure of processing?
To dig deeper, we would need to know the topology DAG.


-Matthias


On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
Mathias,

This apparently happens because we have more than 1 source topic. We have 3 
source topics in the same application. So it seems like the task assignment 
algorithm creates topologies not for one specific topic at a time but the total 
partitions across all source topics consumed in an application instance. 
Because we have some code dependencies between these 3 source topics we can’t 
separate them into 3 applications at this time. Hence the reason I want to get 
the task assignment algorithm basically do a uniform and simple task assignment 
PER source topic.

Ara.

On Mar 25, 2017, at 5:21 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
 wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" 
<matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 5:21:47 PM PDT
To: 
users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
Reply-To: 
<users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>


Hi,

I am wondering why this happens in the first place. Streams,
load-balanced over all running instances, and each instance should be
the same number of tasks (and thus partitions) assigned.

What is the overall assignment? Do you have StandyBy tasks configured?
What version do you use?


-Matthias


On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
Hi,

Is there a way to tell kafka streams to uniformly assign partitions across 
instances? If I have n kafka streams instances running, I want each to handle 
EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 
1/n assignment.

Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We 
also have 2 kafka streams instances. Each instances get assigned to handle 4 
“source" topic partitions. BUT then we do a few maps and an aggregate. So data 
gets shuffled around. The map function uniformly distributes these across all 
partitions (I can verify that by looking at the partition offsets). After the 
map what I notice by looking at the topology is that one kafka streams instance 
get assigned to handle say 2 aggregate repartition topics and the other one 
gets assigned 6. Even worse, on bigger clusters (say 4 instances) we see say 2 
nodes gets assigned downstream aggregate repartition topics and 2 other nodes 
assigned NOTHING to handle.

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.










_

Re: more uniform task assignment across kafka stream nodes

2017-03-25 Thread Ara Ebrahimi
Mathias,

This apparently happens because we have more than 1 source topic. We have 3 
source topics in the same application. So it seems like the task assignment 
algorithm creates topologies not for one specific topic at a time but the total 
partitions across all source topics consumed in an application instance. 
Because we have some code dependencies between these 3 source topics we can’t 
separate them into 3 applications at this time. Hence the reason I want to get 
the task assignment algorithm basically do a uniform and simple task assignment 
PER source topic.

Ara.

On Mar 25, 2017, at 5:21 PM, Matthias J. Sax 
<matth...@confluent.io<mailto:matth...@confluent.io>> wrote:






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.



From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>>
Subject: Re: more uniform task assignment across kafka stream nodes
Date: March 25, 2017 at 5:21:47 PM PDT
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>


Hi,

I am wondering why this happens in the first place. Streams,
load-balanced over all running instances, and each instance should be
the same number of tasks (and thus partitions) assigned.

What is the overall assignment? Do you have StandyBy tasks configured?
What version do you use?


-Matthias


On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
Hi,

Is there a way to tell kafka streams to uniformly assign partitions across 
instances? If I have n kafka streams instances running, I want each to handle 
EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 
1/n assignment.

Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We 
also have 2 kafka streams instances. Each instances get assigned to handle 4 
“source" topic partitions. BUT then we do a few maps and an aggregate. So data 
gets shuffled around. The map function uniformly distributes these across all 
partitions (I can verify that by looking at the partition offsets). After the 
map what I notice by looking at the topology is that one kafka streams instance 
get assigned to handle say 2 aggregate repartition topics and the other one 
gets assigned 6. Even worse, on bigger clusters (say 4 instances) we see say 2 
nodes gets assigned downstream aggregate repartition topics and 2 other nodes 
assigned NOTHING to handle.

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.












This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: kafka streams locking issue in 0.10.20.0

2017-02-25 Thread Ara Ebrahimi
Hi,

Thanks for the reply.

Let me give you more information:

- we have a group by + aggregate. The hopping time window is 10 minutes but the 
maintainMs is 180 days (we’re trying to reprocess the entire data set of 
billions of records).

- Over time, after just a few hours, the aggregate slows down considerably. 
Initially it keeps up with the ingest rate just fine, but then after a few 
hours we see that a counter we have in the body of the aggregate block stops 
incrementing completely and resumes after a few minutes! It seems to me it’s 
trying to roll the rocksdb files and pauses the whole StreamThread? Is that 
possible? Is that the root cause of this issue? What’s the best way to monitor 
rocksdb rolling? Can’t find much in the LOG file. I don’t see any stalling.

- If above case is possible then what’s the solution? Much smaller maintainMs? 
Or tuning rocksdb? This is on SSD. Our records have windowed keys of 28 bytes 
and 200 bytes average value size.

Ara.

> On Feb 24, 2017, at 3:15 AM, Damian Guy <damian@gmail.com> wrote:
>
> Hi Ara,
>
> This usually means that one, or more, of your StreamThreads is taking a
> long time during recovery or processing. So the rebalance times out and you
> get another rebalance.
> The second exception, i.e.,
> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
> Will retry.
> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
>
> Is because you have at least one thread that is still processing and hasn't
> given up its tasks that have been revoked yet.
> You probably need to get a thread dump to see what the threads are doing at
> the time. As the first exception suggests, you may need to increase the
> max.poll.interval.ms
>
> Thanks,
> Damian
>
> On Thu, 23 Feb 2017 at 18:26 Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> After upgrading to 0.10.20.0 I got this:
>>
>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> cannot be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>> ... 10 more
>>
>> Which lead to this:
>>
>> 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread
>> [StreamThread-9] Failed while executing StreamTask 4_6 due to commit
>> consumer offsets:
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
>> completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> at
>> org.a

kafka streams locking issue in 0.10.20.0

2017-02-23 Thread Ara Ebrahimi
Hi,

After upgrading to 0.10.20.0 I got this:

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit 
cannot be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time message processing. You 
can address this either by increasing the session timeout or by reducing the 
maximum size of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
... 10 more

Which lead to this:

2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread [StreamThread-9] 
Failed while executing StreamTask 4_6 due to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
argyle-streams failed on partition revocation
org.apache.kafka.streams.errors.StreamsException: stream-thread 
[StreamThread-9] failed to suspend stream tasks
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 

KTable TTL

2017-02-13 Thread Ara Ebrahimi
Hi,

I have a ktable and I want to keep entries in it only for that past 24 hours. 
How can I do that? I understand rocksdb has support for ttl. Should I set that? 
How? Should I use kafka-streams window functionality? Would it remove data from 
old windows?

I want to do this because I’m seeing a huge lag forming in the repartition 
topic backing the ktable after a while and the whole thing slows down. In jmx I 
see values such as these for rocksdb metrics:

all-get-avg-latency-ms=60123
all-put-avg-latency-ms=40456
all-put-qps=421
all-get-qps=430

What’s the best way to check to see how rocksdb is performing and where the 
bottleneck is? This is on an ssd.

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-02 Thread Ara Ebrahimi
Thanks! Worked like a charm.

For some partitions, which do have data, I see “unknown” reported as offset. 
Any idea what unknown means?

Also what’s the new command for setting offsets? Specifically move it back to 
point 0, AND also to move it to the end.

Ara.

> On Feb 2, 2017, at 11:21 AM, Jan Filipiak <jan.filip...@trivago.com> wrote:
>
> Hi,
>
> sorry and using the consumer group tool, instead of the offset checker
>
>
> On 02.02.2017 20:08, Jan Filipiak wrote:
>> Hi,
>>
>> if its a kafka stream app, its most likely going to store its offsets
>> in kafka rather than zookeeper.
>> You can use the --new-consumer option to check for kafka stored offsets.
>>
>> Best Jan
>>
>>
>> On 01.02.2017 21:14, Ara Ebrahimi wrote:
>>> Hi,
>>>
>>> For a subset of our topics we get this error:
>>>
>>> $KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group
>>> argyle-streams --topic topic_name --zookeeper $ZOOKEEPERS
>>> [2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is
>>> deprecated and will be dropped in releases following 0.9.0. Use
>>> ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
>>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>>> KeeperErrorCode = NoNode for
>>> /consumers/streams-app/offsets/topic_name/2.
>>>
>>> All topics are created via the same process but a few of them report
>>> this error.
>>>
>>> Why I check in zk there’s noting under /consumers:
>>>
>>> zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers
>>>
>>> WATCHER::
>>>
>>> WatchedEvent state:SyncConnected type:None path:null
>>> []
>>>
>>> But the kafka-consumer-offset-checker does work for many topics
>>> anyway, and it fails for a few.
>>>
>>> Why does this happen? How can I fix it?
>>>
>>> Thanks,
>>> Ara.
>>>
>>>
>>>
>>> 
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If
>>> you have received it in error, please notify the sender immediately
>>> and delete the original. Any other use of the e-mail by you is
>>> prohibited. Thank you in advance for your cooperation.
>>>
>>> 
>>
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-01 Thread Ara Ebrahimi
Hi,

For a subset of our topics we get this error:

$KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group argyle-streams --topic 
topic_name --zookeeper $ZOOKEEPERS
[2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is deprecated and 
will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. 
(kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for /consumers/streams-app/offsets/topic_name/2.

All topics are created via the same process but a few of them report this error.

Why I check in zk there’s noting under /consumers:

zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[]

But the kafka-consumer-offset-checker does work for many topics anyway, and it 
fails for a few.

Why does this happen? How can I fix it?

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka-streams ktable recovery after rebalance crash

2017-02-01 Thread Ara Ebrahimi
Hi,

My kafka-streams application crashed due to a rebalance event (seems like I 
need to increase max.poll.interval.ms even more!) and then when I restarted the 
app I noticed existing rocksdb files were gone and while the rest of the 
pipeline was processing the part dealing with ktable was sitting there doing 
nothing for minutes, util it crashed again complaining about this:

2017-02-01 11:48:35 ERROR StreamPipeline:160 - An exception has occurred. 
Shutting down the pipeline...
org.apache.kafka.streams.errors.StreamsException: stream-thread 
[StreamThread-10] Failed to rebalance
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task [2_2] 
Error while creating the state manager
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:89)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
... 1 more
Caused by: java.io.IOException: task [2_2] Failed to lock the state directory: 
/kafka/1/kafka-streams/streams-app/2_2
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:101)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69)
... 13 more


It seems to be it was trying to recreate the rocksdb files from scratch, right? 
How can I fix this situation and make it recover?

Which topic is used for recreating rocksdb files? The changelog or repartition? 
How can I know how much progress has been made recreating these files? Which 
offsets are meaningful?

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: kafka streams consumer partition assignment is uneven

2017-01-09 Thread Ara Ebrahimi
I meant I have 7 topics and each has 12 partitions. Considering that I have 4 
streaming threads per node, I was expecting to see each thread process 1 
partition from each topics and 7 partitions total per streaming thread. But 
that’s not the case. Or perhaps you are saying the number of streaming threads 
should follow the total number of partitions across all 7 topics?!

Ara.

> On Jan 9, 2017, at 11:48 AM, Michael Noll <mich...@confluent.io> wrote:
>
> What does the processing topology of your Kafka Streams application look
> like, and what's the exact topic and partition configuration?  You say you
> have 12 partitions in your cluster, presumably across 7 topics -- that
> means that most topics have just a single partition.  Depending on your
> topology (e.g. if you have defined that single-partition topics A, B, C
> must be joined), Kafka Streams is forced to let one of your three Streams
> nodes process "more" topics/partitions than the other two nodes.
>
> -Michael
>
>
>
> On Mon, Jan 9, 2017 at 6:52 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3
>> kafka streams nodes. Each is configured to have 4 streaming threads. My
>> topology is quite complex and I have 7 topics and lots of joins and states.
>>
>> What I have noticed is that each of the 3 kafka streams nodes gets
>> configured to process variables number of partitions of a topic. One node
>> is assigned to process 2 partitions of topic a and another one gets
>> assigned 5. Hence I end up with nonuniform throughput across these nodes.
>> One node ends up processing more data than the other.
>>
>> What’s going on? How can I make sure partitions assignment to kafka
>> streams nodes is uniform?
>>
>> On a similar topic, is there a way to make sure partition assignment to
>> disks across kafka brokers is also uniform? Even if I use a round-robin one
>> to pin partitions to broker, but there doesn’t seem to be a way to
>> uniformly pin partitions to disks. Or maybe I’m missing something here? I
>> end up with 2 partitions of topic a on disk 1 and 3 partitions of topic a
>> on disk 2. It’s a bit variable. Not totally random, but it’s not uniformly
>> distributed either.
>>
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka streams consumer partition assignment is uneven

2017-01-09 Thread Ara Ebrahimi
Hi,

I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3 kafka 
streams nodes. Each is configured to have 4 streaming threads. My topology is 
quite complex and I have 7 topics and lots of joins and states.

What I have noticed is that each of the 3 kafka streams nodes gets configured 
to process variables number of partitions of a topic. One node is assigned to 
process 2 partitions of topic a and another one gets assigned 5. Hence I end up 
with nonuniform throughput across these nodes. One node ends up processing more 
data than the other.

What’s going on? How can I make sure partitions assignment to kafka streams 
nodes is uniform?

On a similar topic, is there a way to make sure partition assignment to disks 
across kafka brokers is also uniform? Even if I use a round-robin one to pin 
partitions to broker, but there doesn’t seem to be a way to uniformly pin 
partitions to disks. Or maybe I’m missing something here? I end up with 2 
partitions of topic a on disk 1 and 3 partitions of topic a on disk 2. It’s a 
bit variable. Not totally random, but it’s not uniformly distributed either.

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka streams passes org.apache.kafka.streams.kstream.internals.Change to my app!!

2016-12-08 Thread Ara Ebrahimi
Hi,

Once in a while and quite randomly this happens, but it does happen every few 
hundred thousand message:

2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread [StreamThread-4] 
Streams application error during processing:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.internals.Change 
cannot be cast to com.argyledata.streams.entity.Activity
at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown 
Source)
at 
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at 
org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199)
at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190)
at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134)
at 
org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

Has anyone else seen this weird problem?

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: Initializing StateStores takes *really* long for large datasets

2016-11-30 Thread Ara Ebrahimi
+1 on this.

Ara.

> On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak  
> wrote:
>
> I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory
> usage.  I was also surprised by the amount of non-JVM-heap memory being
> used and had to tune the 100 MB default down considerably.  It's also
> unfortunate that it's hard to estimate the memory requirements for a KS app
> because of this.  If you have ten stores, and assuming the default config,
> you'd need a GB of memory for the rocksdb cache if you run 1 app, but only
> half a GB if you run two app instances because the stores will be
> distributed.
>
> It would be much nicer to be able to give KS a fixed amount of memory in a
> config that it divided among the active stores on a node.  Configure it
> with N GB; if a rebalance adds more tasks and stores, they each get less
> RAM; if a rebalance removes tasks and stores, the remaining stores get more
> RAM.  It seems like it'd be hard to do this with the RocksDBConfigSetter
> interface because it doesn't get any state about the KS topology to make
> decisions; which are arguably not config, but tuning / performance
> decisions.
>
> Mathieu
>
>
>
> On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu  wrote:
>
>> I'll write an update on where I am now.
>>
>> I've got about 40 'primary' topics, some small, some up to about 10M
>> messages,
>> and about 30 internal topics, divided over 6 stream instances, all running
>> in a single
>> app, talking to a 3 node Kafka cluster.
>>
>> I use a single thread per stream instance, as my prime concern is now to
>> get it
>> to run stable, rather than optimizing performance.
>>
>> My biggest issue was that after a few hours my application started to slow
>> down
>> to ultimately freeze up or crash. It turned out that RocksDb consumed all
>> my
>> memory, which I overlooked as it was off-heap.
>>
>> I was fooling around with RocksDb settings a bit but I had missed the most
>> important
>> one:
>>
>>BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
>>tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
>>tableConfig.setBlockSize(BLOCK_SIZE);
>>options.setTableFormatConfig(tableConfig);
>>
>> The block cache size defaults to a whopping 100Mb per store, and that gets
>> expensive
>> fast. I reduced it to a few megabytes. My data size is so big that I doubt
>> it is very effective
>> anyway. Now it seems more stable.
>>
>> I'd say that a smaller default makes sense, especially because the failure
>> case is
>> so opaque (running all tests just fine but with a serious dataset it dies
>> slowly)
>>
>> Another thing I see is that while starting all my instances, some are quick
>> and some take
>> time (makes sense as the data size varies greatly), but as more instances
>> start up, they
>> start to use more and more CPU I/O and network, that the initialization of
>> the bigger ones
>> takes even longer, increasing the chance that one of them takes longer than
>> the
>> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can
>> separate the 'initialize' and 'start' step somehow.
>>
>> In this case we could log better: If initialization is taking longer than
>> the timeout, it ends up
>> being reassigned (in my case to the same instance) and then it errors out
>> on being unable
>> to lock the state dir. That message isn't too informative as the timeout is
>> the actual problem.
>>
>> regards, Frank
>>
>>
>> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang  wrote:
>>
>>> Hello Frank,
>>>
>>> How many instances do you have in your apps and how many threads did you
>>> use per thread? Note that besides the topology complexity (i.e. number of
>>> state stores, number of internal topics etc) the (re-)initialization
>>> process is depending on the underlying consumer's membership protocol,
>> and
>>> hence its rebalance latency could be longer with larger groups.
>>>
>>> We have been optimizing our rebalance latency due to state store
>> migration
>>> and restoration in the latest release, but for now the re-initialization
>>> latency is still largely depends on 1) topology complexity regarding to
>>> state stores and 2) number of input topic partitions and instance /
>> threads
>>> in the application.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy 
>> wrote:
>>>
 Hi Frank,

 If you are running on a single node then the RocksDB state should be
 re-used by your app. However, it relies on the app being cleanly
>> shutdown
 and the existence of ".checkpoint" files in the state directory for the
 store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the
 file
 doesn't exist then the entire state will be restored from the
>> changelog -
 which could take some time. I suspect this is what is happening?

 As for the RocksDB memory settings, yes the off 

Re: Hang while close() on KafkaStream

2016-11-17 Thread Ara Ebrahimi
This happens for me too. On 10.1.0. Seems like it just sits there waiting in 
the streamThread.join() call.

Ara.

> On Nov 17, 2016, at 6:02 PM, mordac2k  wrote:
>
> Hello all,
>
> I have a Java application in which I use an instance of KafkaStreams that
> is working splendidly, under normal circumstances.
>
> However, during development testing, I am using a single kafka instance
> (0.10.1.0) as the broker. When this single broker is not running, sending
> SIGTERM to my application causes it to hang on streams.close().
>
> This hang appears to be indefinite, it does not appear to ever time out.
> Funny enough, if, during this hang, the broker comes back online, the close
> will continue and the application will terminate properly as a result of
> the original SIGTERM.
>
> I've tried this with broker configurations where the partition count is set
> to 1 and 2, with the same result in both scenarios.
>
> (FWIW: I scanned the archive of this ML looking for previous instances of
> this issue, and did not find one concerning streams specifically)
>
> Any thoughts?
>
> Thanks.
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: kafka streams rocksdb tuning (or memory leak?)

2016-11-16 Thread Ara Ebrahimi
This indeed was the issue and cp2 did fix it! Thank you!

Ara.

> On Nov 16, 2016, at 4:11 PM, Damian Guy <damian@gmail.com> wrote:
>
> Hi Ara,
>
> Are you running Kafka Streams v0.10.1? If so, there is a resource leak in
> the window store. See: https://github.com/apache/kafka/pull/2122
> You can either try checking out the Apache Kafka 0.10.1 branch (which has
> the fix) and building it manually, or you can try using the latest
> confluent packaged version of the library, which you can get from
> confluents maven repository:
>
> 
>
>confluent
>http://packages.confluent.io/maven/
>
>
> 
>org.apache.kafka
>kafka-streams
>0.10.1.0-cp2
>org.apache.kafka
>kafka-clients
>    0.10.1.0-cp2
>
> Thanks,
>
> Damian
>
>
> On Wed, 16 Nov 2016 at 14:52 Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> I have a few KTables in my application. Some of them have unlimited
>> windows. If I leave the application to run for a few hours, I see the java
>> process consume more and more memory, way above the -Xmx limit. I
>> understand this is due to the rocksdb native lib used by kafka streams.
>> What I don’t understand is how I can control how much rocksdb data should
>> be kept in memory. I know I can use the RocksDBConfigSetter class, but I
>> need more information on why this unbounded memory allocation happens and
>> how to tune it.
>>
>> Thanks,
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka streams rocksdb tuning (or memory leak?)

2016-11-16 Thread Ara Ebrahimi
Hi,

I have a few KTables in my application. Some of them have unlimited windows. If 
I leave the application to run for a few hours, I see the java process consume 
more and more memory, way above the -Xmx limit. I understand this is due to the 
rocksdb native lib used by kafka streams. What I don’t understand is how I can 
control how much rocksdb data should be kept in memory. I know I can use the 
RocksDBConfigSetter class, but I need more information on why this unbounded 
memory allocation happens and how to tune it.

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: kafka streaming rocks db lock bug?

2016-10-25 Thread Ara Ebrahimi
This was in 10.1.0. What happened was that a kafka broker went down and then 
this happened on the kafka streaming instance which had connection to this 
broker. I can send you all logs I got.

Ara.

On Oct 24, 2016, at 10:41 PM, Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote:

Hello Ara,

Your encountered issue seems to be KAFKA-3812
<https://issues.apache.org/jira/browse/KAFKA-3812>, and KAFKA-3938
<https://issues.apache.org/jira/browse/KAFKA-3938>. Could you try to
upgrade to the newly released 0.10.1.0 version and see if this issue goes
away? If not I would love to investigate this issue further with you.


Guozhang



Guozhang


On Sun, Oct 23, 2016 at 1:45 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

And then this on a different node:

2016-10-23 13:43:57 INFO  StreamThread:286 - stream-thread
[StreamThread-3] Stream thread shutdown complete
2016-10-23 13:43:57 ERROR StreamPipeline:169 - An exception has occurred
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-3] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
StreamThread.java:401)
at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:235)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.(
AbstractTask.java:72)
at org.apache.kafka.streams.processor.internals.
StreamTask.(StreamTask.java:90)
at org.apache.kafka.streams.processor.internals.
StreamThread.createStreamTask(StreamThread.java:622)
at org.apache.kafka.streams.processor.internals.
StreamThread.addStreamTasks(StreamThread.java:649)
at org.apache.kafka.streams.processor.internals.StreamThread.access$000(
StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.
onPartitionsAssigned(StreamThread.java:120)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.
pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
StreamThread.java:398)
... 1 more
Caused by: java.io.IOException: task [7_1] Failed to lock the state
directory: /tmp/kafka-streams/argyle-streams/7_1
at org.apache.kafka.streams.processor.internals.
ProcessorStateManager.(ProcessorStateManager.java:98)
at org.apache.kafka.streams.processor.internals.AbstractTask.(
AbstractTask.java:69)
... 13 more

Ara.

On Oct 23, 2016, at 1:24 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

This happens when I hammer our 5 kafka streaming nodes (each with 4
streaming threads) hard enough for an hour or so:

2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread
[StreamThread-2] Failed to flush state for StreamTask 3_8:
org.apache.kafka.streams.errors.ProcessorStateException: task [3_8]
Failed to flush state store streams-data-record-stats-avro-br-store
at org.apache.kafka.streams.processor.internals.
ProcessorStateManager.flush(ProcessorStateManager.java:322)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
AbstractTask.java:181)
at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
StreamThread.java:360)
at org.apache.kafka.streams.processor.internals.StreamThread.
performOnAllTasks(StreamThread.java:322)
at org.apache.kafka.streams.processor.internals.
StreamThread.flushAllState(StreamThread.java:357)
at org.apache.kafka.streams.processor.internals.StreamThread.
shutdownTasksAndState(StreamThread.java:295)
at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
StreamThread.java:262)
at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:245)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
opening store streams-data-record-stats-avro-br-store-20150516 at
location /tmp/kafka-streams/argyle-streams/3_8/streams-data-
record-stats-avro-br-store/streams-data-record-stats-
avro-br-store-20150516
at org.apache.kafka.streams.state.internals.RocksDBStore.
openDB(RocksDBStore.java:196)
at org.apache.kafka.streams.state.internals.RocksDBStore.
openDB(RocksDBStore.java:158)
at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.
openDB(RocksDBWindowStore.java:72)
at org.apache.kafka.streams.

Re: kafka streaming rocks db lock bug?

2016-10-23 Thread Ara Ebrahimi
And then this on a different node:

2016-10-23 13:43:57 INFO  StreamThread:286 - stream-thread [StreamThread-3] 
Stream thread shutdown complete
2016-10-23 13:43:57 ERROR StreamPipeline:169 - An exception has occurred
org.apache.kafka.streams.errors.StreamsException: stream-thread 
[StreamThread-3] Failed to rebalance
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:401)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while 
creating the state manager
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:90)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:622)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:649)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:120)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:398)
... 1 more
Caused by: java.io.IOException: task [7_1] Failed to lock the state directory: 
/tmp/kafka-streams/argyle-streams/7_1
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:98)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69)
... 13 more

Ara.

On Oct 23, 2016, at 1:24 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

This happens when I hammer our 5 kafka streaming nodes (each with 4 streaming 
threads) hard enough for an hour or so:

2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread [StreamThread-2] 
Failed to flush state for StreamTask 3_8:
org.apache.kafka.streams.errors.ProcessorStateException: task [3_8] Failed to 
flush state store streams-data-record-stats-avro-br-store
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:322)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:360)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at 
org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:357)
at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:295)
at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
opening store streams-data-record-stats-avro-br-store-20150516 at location 
/tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putAndReturnInternalKey(RocksDBWindowStore.java:310)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:292)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:101)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:87)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:320)
... 7

kafka streaming rocks db lock bug?

2016-10-23 Thread Ara Ebrahimi
Hi,

This happens when I hammer our 5 kafka streaming nodes (each with 4 streaming 
threads) hard enough for an hour or so:

2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread [StreamThread-2] 
Failed to flush state for StreamTask 3_8:
org.apache.kafka.streams.errors.ProcessorStateException: task [3_8] Failed to 
flush state store streams-data-record-stats-avro-br-store
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:322)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:360)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at 
org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:357)
at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:295)
at 
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
opening store streams-data-record-stats-avro-br-store-20150516 at location 
/tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.putAndReturnInternalKey(RocksDBWindowStore.java:310)
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:292)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:101)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:87)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:320)
... 7 more
Caused by: org.rocksdb.RocksDBException: IO error: lock 
/tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516/LOCK:
 No locks available
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:184)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189)
... 18 more

Some sort of a locking bug?

Note that when this happen this node stops processing anything and the other 
nodes seem to want to pick up the load, which brings the whole streaming 
cluster to a stand still. That’s very worrying. Is a document somewhere 
describing *in detail* how failover for streaming works?

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-28 Thread Ara Ebrahimi
Awesome! Thanks.

Ara.

On Sep 28, 2016, at 3:20 PM, Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote:

Ara,

I'd recommend you using the interactive queries feature, available in the
up coming 0.10.1 in a couple of weeks, to query the current snapshot of the
state store.

We are going to write a blog post about step-by-step instructions to
leverage this feature for use cases just like yours soon.

Guozhang


On Wed, Sep 28, 2016 at 2:19 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

I need this ReadOnlyKeyValueStore.

In my use case, I do an aggregateByKey(), so a KTable is formed, backed by
a state store. This is then used by the next steps of the pipeline. Now
using the word count sample, I try to read the state store. Hence I end up
sharing it with the actual pipeline. And Kafka Streams doesn’t like that
because apparently state stores are assumed to be writable (indeed there’s
a put() in there) and deep in the rocksdb state store code it tries to
create a lock file (and indeed there is a filed named LOCK in there).

Ara.

On Sep 28, 2016, at 2:03 PM, Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com><mailto:wan
gg...@gmail.com<mailto:gg...@gmail.com>>> wrote:

Ara,

Are you using the interactive queries feature but encountered issue due to
locking file conflicts?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
67%3A+Queryable+state+for+Kafka+Streams

This is not expected to happen, if you are indeed using this feature I'd
like to learn more of your error scenario.

Guozhang


On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>
<mailto:ara.ebrah...@argyledata.com>>
wrote:

One more thing:

Guozhang pointed me towards this sample for micro-batching:
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
07c1aef5a1/streams/examples/src/main/java/org/apache/
kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case.
BUT the main problem is that even if my use case deals with writing of
hourly windows of data and hence the data is already in a rocksdb file but
I need to create a duplicate of the same file just to be able to
periodically do range scans on it and write to the external database. I did
try to see if I could get StateStore to read the same rocksdb file used by
the aggregateByKey which is happening before this step but it complained
about not being able to lock the file. Would be great to be able to share
the same underlying file between aggregateByKey (or any other such
KTable-producing operation) and such periodic triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply
hack it. This specific case is writing of records to Accmumlo database, and
I simply use the batch writer to batch writes, and it flushes every second
or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit
too. This is good enough for me, but obviously it’s not perfect. I wish
Kafka Streams had some sort of a trigger (based on x number of records
processed, or y window of time passed). Which brings me to the next use
case.

- I have some logic for calculating hourly statistics. So I’m dealing with
Windowed data already. These stats then need to be written to an external
database for use by user facing systems. Obviously I need to write the
final result for each hourly window after we’re past that window of time
(or I can write as often as it gets updated but the problem is that the
external database is not as fast as Kafka). I do understand that I need to
take into account the fact that events may arrive out of order and there
may be some records arriving a little bit after I’ve considered the
previous window over and have moved to the next one. I’d like to have some
sort of an hourly trigger (not just pure x milliseconds trigger, but also
support for cron style timing) and then also have the option to update the
stats I’ve already written for a window a set amount of time after the
trigger got triggered so that I can deal with events which arrive after the
write for that window. And then there’s a cut-off point after which
updating the stats for a very old window is just not worth it. Something
like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update
every hour afterwards */ Hours.toMillis(1), /* discard changes older than
this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey,
record) -> { /* write */ } );

The tricky part is reconciling event source time and event processing
time. Clearly t

Re: micro-batching in kafka streams

2016-09-28 Thread Ara Ebrahimi
I need this ReadOnlyKeyValueStore.

In my use case, I do an aggregateByKey(), so a KTable is formed, backed by a 
state store. This is then used by the next steps of the pipeline. Now using the 
word count sample, I try to read the state store. Hence I end up sharing it 
with the actual pipeline. And Kafka Streams doesn’t like that because 
apparently state stores are assumed to be writable (indeed there’s a put() in 
there) and deep in the rocksdb state store code it tries to create a lock file 
(and indeed there is a filed named LOCK in there).

Ara.

On Sep 28, 2016, at 2:03 PM, Guozhang Wang 
<wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote:

Ara,

Are you using the interactive queries feature but encountered issue due to
locking file conflicts?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams

This is not expected to happen, if you are indeed using this feature I'd
like to learn more of your error scenario.

Guozhang


On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

One more thing:

Guozhang pointed me towards this sample for micro-batching:
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313
07c1aef5a1/streams/examples/src/main/java/org/apache/
kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case.
BUT the main problem is that even if my use case deals with writing of
hourly windows of data and hence the data is already in a rocksdb file but
I need to create a duplicate of the same file just to be able to
periodically do range scans on it and write to the external database. I did
try to see if I could get StateStore to read the same rocksdb file used by
the aggregateByKey which is happening before this step but it complained
about not being able to lock the file. Would be great to be able to share
the same underlying file between aggregateByKey (or any other such
KTable-producing operation) and such periodic triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com><
mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply
hack it. This specific case is writing of records to Accmumlo database, and
I simply use the batch writer to batch writes, and it flushes every second
or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit
too. This is good enough for me, but obviously it’s not perfect. I wish
Kafka Streams had some sort of a trigger (based on x number of records
processed, or y window of time passed). Which brings me to the next use
case.

- I have some logic for calculating hourly statistics. So I’m dealing with
Windowed data already. These stats then need to be written to an external
database for use by user facing systems. Obviously I need to write the
final result for each hourly window after we’re past that window of time
(or I can write as often as it gets updated but the problem is that the
external database is not as fast as Kafka). I do understand that I need to
take into account the fact that events may arrive out of order and there
may be some records arriving a little bit after I’ve considered the
previous window over and have moved to the next one. I’d like to have some
sort of an hourly trigger (not just pure x milliseconds trigger, but also
support for cron style timing) and then also have the option to update the
stats I’ve already written for a window a set amount of time after the
trigger got triggered so that I can deal with events which arrive after the
write for that window. And then there’s a cut-off point after which
updating the stats for a very old window is just not worth it. Something
like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update
every hour afterwards */ Hours.toMillis(1), /* discard changes older than
this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey,
record) -> { /* write */ } );

The tricky part is reconciling event source time and event processing
time. Clearly this trigger is in the event processing time whereas the data
is in the event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll 
<mich...@confluent.io<mailto:mich...@confluent.io>mailto:ich...@confluent.io>>> wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have f

Re: micro-batching in kafka streams

2016-09-27 Thread Ara Ebrahimi
One more thing:

Guozhang pointed me towards this sample for micro-batching: 
https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

This is a good example and successfully got it adapted for my user case. BUT 
the main problem is that even if my use case deals with writing of hourly 
windows of data and hence the data is already in a rocksdb file but I need to 
create a duplicate of the same file just to be able to periodically do range 
scans on it and write to the external database. I did try to see if I could get 
StateStore to read the same rocksdb file used by the aggregateByKey which is 
happening before this step but it complained about not being able to lock the 
file. Would be great to be able to share the same underlying file between 
aggregateByKey (or any other such KTable-producing operation) and such periodic 
triggers.

Ara.

On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote:

Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply hack 
it. This specific case is writing of records to Accmumlo database, and I simply 
use the batch writer to batch writes, and it flushes every second or so. I’ve 
added a shutdown hook to the jvm to flush upon graceful exit too. This is good 
enough for me, but obviously it’s not perfect. I wish Kafka Streams had some 
sort of a trigger (based on x number of records processed, or y window of time 
passed). Which brings me to the next use case.

- I have some logic for calculating hourly statistics. So I’m dealing with 
Windowed data already. These stats then need to be written to an external 
database for use by user facing systems. Obviously I need to write the final 
result for each hourly window after we’re past that window of time (or I can 
write as often as it gets updated but the problem is that the external database 
is not as fast as Kafka). I do understand that I need to take into account the 
fact that events may arrive out of order and there may be some records arriving 
a little bit after I’ve considered the previous window over and have moved to 
the next one. I’d like to have some sort of an hourly trigger (not just pure x 
milliseconds trigger, but also support for cron style timing) and then also 
have the option to update the stats I’ve already written for a window a set 
amount of time after the trigger got triggered so that I can deal with events 
which arrive after the write for that window. And then there’s a cut-off point 
after which updating the stats for a very old window is just not worth it. 
Something like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every 
hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ 
Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* 
write */ } );

The tricky part is reconciling event source time and event processing time. 
Clearly this trigger is in the event processing time whereas the data is in the 
event source time most probably.

Something like that :)

Ara.

On Sep 26, 2016, at 1:59 AM, Michael Noll 
<mich...@confluent.io<mailto:mich...@confluent.io>> wrote:

Ara,

may I ask why you need to use micro-batching in the first place?

Reason why I am asking: Typically, when people talk about micro-batching,
they are refer to the way some originally batch-based stream processing
tools "bolt on" real-time processing by making their batch sizes really
small.  Here, micro-batching belongs to the realm of the inner workings of
the stream processing tool.

Orthogonally to that, you have features/operations such as windowing,
triggers, etc. that -- unlike micro-batching -- allow you as the user of
the stream processing tool to define which exact computation logic you
need.  Whether or not, say, windowing is or is not computed via
micro-batching behind the scenes should (at least in an ideal world) be of
no concern to the user.

-Michael





On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi 
<ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>>
wrote:

Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a
built-in mechanism? Perhaps StateStore could act as the buffer? What
exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
to be used anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain
privileged, proprietary, or otherwise confidential information. If you have
received it in error, please notify the sender immediately and delete the
original. Any other use of the e-mail by you is proh

Re: micro-batching in kafka streams

2016-09-26 Thread Ara Ebrahimi
Hi,

So, here’s the situation:

- for classic batching of writes to external systems, right now I simply hack 
it. This specific case is writing of records to Accmumlo database, and I simply 
use the batch writer to batch writes, and it flushes every second or so. I’ve 
added a shutdown hook to the jvm to flush upon graceful exit too. This is good 
enough for me, but obviously it’s not perfect. I wish Kafka Streams had some 
sort of a trigger (based on x number of records processed, or y window of time 
passed). Which brings me to the next use case.

- I have some logic for calculating hourly statistics. So I’m dealing with 
Windowed data already. These stats then need to be written to an external 
database for use by user facing systems. Obviously I need to write the final 
result for each hourly window after we’re past that window of time (or I can 
write as often as it gets updated but the problem is that the external database 
is not as fast as Kafka). I do understand that I need to take into account the 
fact that events may arrive out of order and there may be some records arriving 
a little bit after I’ve considered the previous window over and have moved to 
the next one. I’d like to have some sort of an hourly trigger (not just pure x 
milliseconds trigger, but also support for cron style timing) and then also 
have the option to update the stats I’ve already written for a window a set 
amount of time after the trigger got triggered so that I can deal with events 
which arrive after the write for that window. And then there’s a cut-off point 
after which updating the stats for a very old window is just not worth it. 
Something like this DSL:

kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every 
hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ 
Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* 
write */ } );

The tricky part is reconciling event source time and event processing time. 
Clearly this trigger is in the event processing time whereas the data is in the 
event source time most probably.

Something like that :)

Ara.

> On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io> wrote:
>
> Ara,
>
> may I ask why you need to use micro-batching in the first place?
>
> Reason why I am asking: Typically, when people talk about micro-batching,
> they are refer to the way some originally batch-based stream processing
> tools "bolt on" real-time processing by making their batch sizes really
> small.  Here, micro-batching belongs to the realm of the inner workings of
> the stream processing tool.
>
> Orthogonally to that, you have features/operations such as windowing,
> triggers, etc. that -- unlike micro-batching -- allow you as the user of
> the stream processing tool to define which exact computation logic you
> need.  Whether or not, say, windowing is or is not computed via
> micro-batching behind the scenes should (at least in an ideal world) be of
> no concern to the user.
>
> -Michael
>
>
>
>
>
> On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
>> built-in mechanism? Perhaps StateStore could act as the buffer? What
>> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
>> to be used anywhere?
>>
>> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>>
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: micro-batching in kafka streams

2016-09-12 Thread Ara Ebrahimi
Thanks.

+1 on KIP-63 story. I need all of that :)

Ara.

> On Sep 11, 2016, at 8:19 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> Hello Ara,
>
> On the processor API, users have the flexible to do micro-batching with
> their own implementation patterns. For example, like you mentioned already:
>
> 1. Use a state store to bookkeep recently received records, and in
> process() function simply put the record into the store.
> 2. Use puncutate() function to periodically process the bookkept batch
> store in the state by iterating over the state, and send results to the
> downstream.
>
> You can find a simple example in WordCount demo:
>
> https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
>
> Note that it does not bookkeep the original records as micro-batches, but
> compute the running aggregate results. But the general coding pattern is
> the same.
>
> On the higher-level streams DSL, there is a proposed KIP for using caching
> for aggregate operators, as a manner for implicit "trigger" mechanism. This
> is not exactly the same as micro-batching, but also acts as reducing IO
> costs as well as data traffic:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
>
>
> Let me know if these references are helpful to you.
>
> Guozhang
>
>
>
>
>
>
> On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> What’s the best way to do micro-batching in Kafka Streams? Any plans for a
>> built-in mechanism? Perhaps StateStore could act as the buffer? What
>> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem
>> to be used anywhere?
>>
>> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/
>>
>> Ara.
>>
>>
>>
>> 
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> 
>>
>
>
>
> --
> -- Guozhang
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




Re: Performance issue with KafkaStreams

2016-09-10 Thread Ara Ebrahimi
Hi Eno,

Could you elaborate more on tuning Kafka Streaming applications? What are the 
relationships between partitions and num.stream.threads num.consumer.fetchers 
and other such parameters? On a single node setup with x partitions, what’s the 
best way to make sure these partitions are consumed and processed by kafka 
streams optimally?

Ara.

> On Sep 10, 2016, at 3:18 AM, Eno Thereska  wrote:
>
> Hi Caleb,
>
> We have a benchmark that we run nightly to keep track of performance. The 
> numbers we have do indicate that consuming through streams is indeed slower 
> than just a pure consumer, however the performance difference is not as large 
> as you are observing. Would it be possible for you to run this benchmark in 
> your environment and report the numbers? The benchmark is included with Kafka 
> Streams and you run it like this:
>
> export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh 
> org.apache.kafka.streams.perf.SimpleBenchmark
>
> You'll need a Kafka broker to be running. The benchmark will report various 
> numbers, but one of them is the performance of the consumer and the 
> performance of streams reading.
>
> Thanks
> Eno
>
>> On 9 Sep 2016, at 18:19, Caleb Welton  wrote:
>>
>> Same in both cases:
>> client.id=Test-Prototype
>> application.id=test-prototype
>>
>> group.id=test-consumer-group
>> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181
>> replication.factor=2
>>
>> auto.offset.reset=earliest
>>
>>
>> On Friday, September 9, 2016 8:48 AM, Eno Thereska  
>> wrote:
>>
>>
>>
>> Hi Caleb,
>>
>> Could you share your Kafka Streams configuration (i.e., StreamsConfig
>> properties you might have set before the test)?
>>
>> Thanks
>> Eno
>>
>>
>> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton  wrote:
>>
>>> I have a question with respect to the KafkaStreams API.
>>>
>>> I noticed during my prototyping work that my KafkaStreams application was
>>> not able to keep up with the input on the stream so I dug into it a bit and
>>> found that it was spending an inordinate amount of time in
>>> org.apache.kafka.common.network.Seloctor.select().  Not exactly a shooting
>>> gun itself, so I dropped the implementation down to a single processor
>>> reading off a source.
>>>
>>> public class TestProcessor extends AbstractProcessor {
>>>   static long start = -1;
>>>   static long count = 0;
>>>
>>>   @Override
>>>   public void process(String key, String value) {
>>>   if (start < 0) {
>>>   start = System.currentTimeMillis();
>>>   }
>>>   count += 1;
>>>   if (count > 100) {
>>>   long end = System.currentTimeMillis();
>>>   double time = (end-start)/1000.0;
>>>   System.out.printf("Processed %d records in %f seconds (%f
>>> records/s)\n", count, time, count/time);
>>>   start = -1;
>>>   count = 0;
>>>   }
>>>
>>>   }
>>>
>>> }
>>>
>>> ...
>>>
>>>
>>> TopologyBuilder topologyBuilder = new TopologyBuilder();
>>> topologyBuilder
>>>   .addSource("SOURCE", stringDeserializer, StringDeserializer,
>>> "input")
>>>   .addProcessor("PROCESS", TestProcessor::new, "SOURCE");
>>>
>>>
>>>
>>> Which I then ran through the KafkaStreams API, and then repeated with
>>> the KafkaConsumer API.
>>>
>>> Using the KafkaConsumer API:
>>> Processed 101 records in 1.79 seconds (558659.776536 records/s)
>>> Processed 101 records in 1.229000 seconds (813670.463792 records/s)
>>> Processed 101 records in 1.106000 seconds (904160.036166 records/s)
>>> Processed 101 records in 1.19 seconds (840336.974790 records/s)
>>>
>>> Using the KafkaStreams API:
>>> Processed 101 records in 6.407000 seconds (156079.444358 records/s)
>>> Processed 101 records in 5.256000 seconds (190258.942161 records/s)
>>> Processed 101 records in 5.141000 seconds (194514.880373 records/s)
>>> Processed 101 records in 5.111000 seconds (195656.622970 records/s)
>>>
>>>
>>> The profile on the KafkaStreams consisted of:
>>>
>>> 89.2% org.apache.kafka.common.network.Selector.select()
>>> 7.6% org.apache.kafka.clients.producer.internals.
>>> ProduceRequestResult.await()
>>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read()
>>>
>>>
>>> Is a 5X performance difference between Kafka Consumer and the
>>> KafkaStreams api expected?
>>>
>>> Are there specific things I can do to diagnose/tune the system?
>>>
>>> Thanks,
>>> Caleb
>>>
>
>
>
>
> 
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> 






This message is for 

Re: enhancing KStream DSL

2016-09-09 Thread Ara Ebrahimi
Ah works! Thanks! I was under the impression that these are sequentially 
chained using the DSL. Didn’t realize I can still use allRecords parallel to 
the branches.

Ara.

> On Sep 9, 2016, at 5:27 AM, Michael Noll <mich...@confluent.io> wrote:
>
> Oh, my bad.
>
> Updating the third predicate in `branch()` may not even be needed.
>
> You could simply do:
>
> KStream<String, CallRecord>[] branches = allRecords
>.branch(
>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
> ecord.getCallCommType()),
>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
> cord.getCallCommType())
>// Any callRecords that aren't matching any of the two
> predicates above will be dropped.
>);
>
> This would give you two branched streams instead of three:
>
>KStream<String, CallRecord> voiceRecords = branches[0];
>KStream<String, CallRecord> dataRecords = branches[1];
>// No third branched stream like before.
>
> Then, to count "everything" (VOICE + DATA + everything else), simply reuse
> the original `allRecords` stream.
>
>
>
> On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mich...@confluent.io> wrote:
>
>> Ara,
>>
>> you have shared this code snippet:
>>
>>>   allRecords.branch(
>>>   (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>>   (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>>   (imsi, callRecord) -> true
>>>   );
>>
>> The branch() operation partitions the allRecords KStream into three
>> disjoint streams.
>>
>> I'd suggest the following.
>>
>> First, update the third predicate in your `branch()` step to be "everything
>> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA
>> records are removed:
>>
>>
>>KStream<String, CallRecord>[] branches = allRecords
>>.branch(
>>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>> ecord.getCallCommType()),
>>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>> cord.getCallCommType()),
>>(imsi, callRecord) -> !(callRecord.getCallCommType().
>> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e
>> qualsIgnoreCase("DATA"))
>>);
>>
>> This would give you:
>>
>>KStream<String, CallRecord> voiceRecords = branches[0];
>>KStream<String, CallRecord> dataRecords = branches[1];
>>KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData =
>> branches[2];
>>
>> Then, to count "everything" (VOICE + DATA + everything else), simply
>> reuse the original `allRecords` stream.
>>
>> -Michael
>>
>>
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com
>>> wrote:
>>
>>> Let’s say I have this:
>>>
>>>
>>> KStream<String, CallRecord>[] branches = allRecords
>>>.branch(
>>>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR
>>> ecord.getCallCommType()),
>>>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe
>>> cord.getCallCommType()),
>>>(imsi, callRecord) -> true
>>>);
>>> KStream<String, CallRecord> callRecords = branches[0];
>>> KStream<String, CallRecord> dataRecords = branches[1];
>>> KStream<String, CallRecord> callRecordCounter = branches[2];
>>>
>>> callRecordCounter
>>>.map((imsi, callRecord) -> new KeyValue<>("", ""))
>>>.countByKey(
>>>UnlimitedWindows.of("counter-window"),
>>>stringSerde
>>>)
>>>.print();
>>>
>>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1
>>> if data is DATA. Branch 2 is supposed to get triggered regardless of type
>>> all the type so that then I can count stuff for a time window. BUT the
>>> problem is branch is implemented like this:
>>>
>>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> {
>>>@Override
>>>public void process(K key, V value) {
>>>for (int i = 0; i < predicates.length; i++) {
>>>   

enhancing KStream DSL

2016-09-08 Thread Ara Ebrahimi
Let’s say I have this:


KStream[] branches = allRecords
.branch(
(imsi, callRecord) -> 
"VOICE".equalsIgnoreCase(callRecord.getCallCommType()),
(imsi, callRecord) -> 
"DATA".equalsIgnoreCase(callRecord.getCallCommType()),
(imsi, callRecord) -> true
);
KStream callRecords = branches[0];
KStream dataRecords = branches[1];
KStream callRecordCounter = branches[2];

callRecordCounter
.map((imsi, callRecord) -> new KeyValue<>("", ""))
.countByKey(
UnlimitedWindows.of("counter-window"),
stringSerde
)
.print();

Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data 
is DATA. Branch 2 is supposed to get triggered regardless of type all the type 
so that then I can count stuff for a time window. BUT the problem is branch is 
implemented like this:

private class KStreamBranchProcessor extends AbstractProcessor {
@Override
public void process(K key, V value) {
for (int i = 0; i < predicates.length; i++) {
if (predicates[i].test(key, value)) {
// use forward with childIndex here and then break the loop
// so that no record is going to be piped to multiple streams
context().forward(key, value, i);
break;
}
}
}
}

Note the break. So the counter branch is never reached. I’d like to change the 
behavior of branch so that all predicates are checked and no break happens, in 
say a branchAll() method. What’s the easiest way to this functionality to the 
DSL? I tried process() but it doesn’t return KStream.

Ara.






This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




micro-batching in kafka streams

2016-09-05 Thread Ara Ebrahimi
Hi,

What’s the best way to do micro-batching in Kafka Streams? Any plans for a 
built-in mechanism? Perhaps StateStore could act as the buffer? What exactly 
are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used 
anywhere?

http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/

Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.




kafka streams: join 2 Windowed tables

2016-09-01 Thread Ara Ebrahimi
Hi,

Is joining 2 streams based on Windowed keys supposed to work?

I have 2 KTables:
- KTable events: I process events and aggregate events 
that have a common criteria using aggregateByKey and UnlimitedWindows as window 
(for now)
- KTable hourlyStats: I calculate some stats using 
aggregateByKey for hourly windows TimeWindows.of(“window name”, hourly)

Since both use aggregateByKey() they are both KTables and both have 
Windowed keys.

I need to leftJoin the first one (events) with the second one (hourlyStats) BUT 
I need to join event x which occurred at time t0 with hourlyStats of the t0 
window. In other words I need to join using 
JoinWindows.of("JoinWindow").within(60 * 60 * 1000). Since both are KTables 
this is not possible. But if I turn them both into KStream's using toStream() 
then I can use the leftJoin() variants which supports JoinWindows. They would 
both be KStream.

The problem is the join doesn’t really happen. No hourlyStats is actually found 
for any row of events. The TimestampExtractor for both is correct.

So, Is joining 2 streams based on Windowed keys supposed to work? If 
not then how can I accomplish the above task?

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.