kaka-streams 0.11.0.1 rocksdb bug?
Hi, We just upgraded to kaka-streams 0.11.0.1 and noticed that in the cluster deployment reduce() never gets called. Funny thing is it does gets called in the unit tests. And no, it’s not a data issue. What I have noticed is that all rocked folders (//1_0// and so on) are empty. I do not see any file permission errors or any errors in general upon startup of streaming application. Has anyone else run into this issue? Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: session window bug not fixed in 0.10.2.1?
Well, I can tell you this: Performance is good initially but then it degrades massively after processing a few billion records in a few hours. It becomes quite “choppy”. A little bit of data is processed, then absolute silence, then another small chunk processed, and so on. I did enable rocksdb logs and there’s no write stalling happening. Note that this is different from what we saw in the previous version. In that version turning on caching would slow down 5x the whole thing right from the beginning. So it seems to me KAFKA-5172 can cause such a behavior? Ara. On May 5, 2017, at 5:22 PM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Could KAFKA-5172 cause similar observations? Guozhang On Thu, May 4, 2017 at 1:30 AM, Damian Guy <damian@gmail.com<mailto:damian@gmail.com>> wrote: It is odd as the person that originally reported the problem has verified that it is fixed. On Thu, 4 May 2017 at 08:31 Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Ara, That is a bit weird, I double checked and agreed with Eno that this commit is in both trunk and 0.10.2, so I suspect the same issue still persists in trunk, hence there might be another issue that is not fixed in 2645. Could you help verify if that is the case? In which we can re-open https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further. Guozhang On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi < ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: No errors. But if I enable caching I see performance drop considerably. The workaround was to disable caching. The same thing is still true in 10.2.1. Ara. On May 2, 2017, at 12:55 PM, Eno Thereska <eno.there...@gmail.com<mailto:eno.there...@gmail.com>> wrote: Hi Ara, The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and 0.10.2.1, I just checked. What error are you seeing, could you give us an update? Thanks Eno On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi < ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: Hi, I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and tested again. It doesn’t seem to be fixed? Ara. On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com<mailto:damian@gmail.com>> wrote: Hi Ara, There is a performance issue in the 0.10.2 release of session windows. It is fixed with this PR: https://github.com/apache/kafka/pull/2645 You can work around this on 0.10.2 by calling the aggregate(..), reduce(..) etc methods and supplying StateStoreSupplier with caching disabled, i.e, by doing something like: final StateStoreSupplier sessionStore = Stores.create(*"session-store-name"*) .withKeys(Serdes.String()) .withValues(Serdes.String()) .persistent() .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) .build(); The fix has also been cherry-picked to the 0.10.2 branch, so you could build from source and not have to create the StateStoreSupplier. Thanks, Damian On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi < ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com> wrote: Thanks for the response Mathias! The reason we want this exact task assignment to happen is that a critical part of our pipeline involves grouping relevant records together (that’s what the aggregate function in the topology is for). And for hot keys this can lead to sometimes 100s of records to get grouped together. Even worse, these records are session bound, we use session windows. Hence we see lots of activity around the store backing the aggregate function and even though we use SSD drives we’re not seeing the kind of performance we want to see. It seems like the aggregate function leads to lots of updates to these hot keys which lead to lots of rocksdb activity. Now there are many ways to fix this problem: - just don’t aggregate, create an algorithm which is not reliant on grouping/aggregating records. Not what we can do with our tight schedule right now. - do grouping/aggregating but employ n instances and rely on uniform distribution of these tasks. This is the easiest solution and what we expected to work but didn’t work as you can tell from this thread. We threw 4 instances at it but only 2 got used. - tune rocksdb? I tried this actually but it didn’t really help us much, aside from the fact that tuning rocksdb is very tricky. - use in-memory store instead? Unfortunately we have to use session windows for this aggregate function and apparently there’s no in-memory session store impl? I tried to create one but soon realized it’s too much work :) I looked at default PartitionAssigner code too, but that ain’t trivial either. So I’m a bit hopeless :( Ara. On Mar 27, 2017, at 1:35 PM, Matthias J. Sax < matth...@confluent.io<mailto:matth...@confluent.io> > wrote: This message
Re: session window bug not fixed in 0.10.2.1?
No errors. But if I enable caching I see performance drop considerably. The workaround was to disable caching. The same thing is still true in 10.2.1. Ara. > On May 2, 2017, at 12:55 PM, Eno Thereska <eno.there...@gmail.com> wrote: > > Hi Ara, > > The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and > 0.10.2.1, I just checked. What error are you seeing, could you give us an > update? > > Thanks > Eno > > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and >> tested again. It doesn’t seem to be fixed? >> >> Ara. >> >>> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote: >>> >>> Hi Ara, >>> >>> There is a performance issue in the 0.10.2 release of session windows. It >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645 >>> You can work around this on 0.10.2 by calling the aggregate(..), >> reduce(..) >>> etc methods and supplying StateStoreSupplier with caching >>> disabled, i.e, by doing something like: >>> >>> final StateStoreSupplier sessionStore = >>> Stores.create(*"session-store-name"*) >>> .withKeys(Serdes.String()) >>> .withValues(Serdes.String()) >>> .persistent() >>> .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) >>> .build(); >>> >>> >>> The fix has also been cherry-picked to the 0.10.2 branch, so you could >>> build from source and not have to create the StateStoreSupplier. >>> >>> Thanks, >>> Damian >>> >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com> >>> wrote: >>> >>> Thanks for the response Mathias! >>> >>> The reason we want this exact task assignment to happen is that a >> critical >>> part of our pipeline involves grouping relevant records together (that’s >>> what the aggregate function in the topology is for). And for hot keys >> this >>> can lead to sometimes 100s of records to get grouped together. Even >> worse, >>> these records are session bound, we use session windows. Hence we see >> lots >>> of activity around the store backing the aggregate function and even >> though >>> we use SSD drives we’re not seeing the kind of performance we want to >> see. >>> It seems like the aggregate function leads to lots of updates to these >> hot >>> keys which lead to lots of rocksdb activity. >>> >>> Now there are many ways to fix this problem: >>> - just don’t aggregate, create an algorithm which is not reliant on >>> grouping/aggregating records. Not what we can do with our tight schedule >>> right now. >>> - do grouping/aggregating but employ n instances and rely on uniform >>> distribution of these tasks. This is the easiest solution and what we >>> expected to work but didn’t work as you can tell from this thread. We >> threw >>> 4 instances at it but only 2 got used. >>> - tune rocksdb? I tried this actually but it didn’t really help us much, >>> aside from the fact that tuning rocksdb is very tricky. >>> - use in-memory store instead? Unfortunately we have to use session >> windows >>> for this aggregate function and apparently there’s no in-memory session >>> store impl? I tried to create one but soon realized it’s too much work >> :) I >>> looked at default PartitionAssigner code too, but that ain’t trivial >> either. >>> >>> So I’m a bit hopeless :( >>> >>> Ara. >>> >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io >> >> matth...@confluent.io>> wrote: >>> >>> >>> >>> >>> >>> >>> This message is for the designated recipient only and may contain >>> privileged, proprietary, or otherwise confidential information. If you >> have >>> received it in error, please notify the sender immediately and delete the >>> original. Any other use of the e-mail by you is prohibited. Thank you in >>> advance for your cooperation. >>> >>> >>> >>> From: "Matthias J. Sax" <matth...@confluent.io> matth...@confluent.io >>>>> >>> Subject: Re: more uniform task assignment across kafka stream no
session window bug not fixed in 0.10.2.1?
Hi, I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and tested again. It doesn’t seem to be fixed? Ara. > On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote: > > Hi Ara, > > There is a performance issue in the 0.10.2 release of session windows. It > is fixed with this PR: https://github.com/apache/kafka/pull/2645 > You can work around this on 0.10.2 by calling the aggregate(..), reduce(..) > etc methods and supplying StateStoreSupplier with caching > disabled, i.e, by doing something like: > > final StateStoreSupplier sessionStore = > Stores.create(*"session-store-name"*) >.withKeys(Serdes.String()) >.withValues(Serdes.String()) >.persistent() >.sessionWindowed(TimeUnit.MINUTES.toMillis(7)) >.build(); > > > The fix has also been cherry-picked to the 0.10.2 branch, so you could > build from source and not have to create the StateStoreSupplier. > > Thanks, > Damian > > On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > > Thanks for the response Mathias! > > The reason we want this exact task assignment to happen is that a critical > part of our pipeline involves grouping relevant records together (that’s > what the aggregate function in the topology is for). And for hot keys this > can lead to sometimes 100s of records to get grouped together. Even worse, > these records are session bound, we use session windows. Hence we see lots > of activity around the store backing the aggregate function and even though > we use SSD drives we’re not seeing the kind of performance we want to see. > It seems like the aggregate function leads to lots of updates to these hot > keys which lead to lots of rocksdb activity. > > Now there are many ways to fix this problem: > - just don’t aggregate, create an algorithm which is not reliant on > grouping/aggregating records. Not what we can do with our tight schedule > right now. > - do grouping/aggregating but employ n instances and rely on uniform > distribution of these tasks. This is the easiest solution and what we > expected to work but didn’t work as you can tell from this thread. We threw > 4 instances at it but only 2 got used. > - tune rocksdb? I tried this actually but it didn’t really help us much, > aside from the fact that tuning rocksdb is very tricky. > - use in-memory store instead? Unfortunately we have to use session windows > for this aggregate function and apparently there’s no in-memory session > store impl? I tried to create one but soon realized it’s too much work :) I > looked at default PartitionAssigner code too, but that ain’t trivial either. > > So I’m a bit hopeless :( > > Ara. > > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io matth...@confluent.io>> wrote: > > > > > > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io >>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 27, 2017 at 1:35:30 PM PDT > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > Ara, > > thanks for the detailed information. > > If I parse this correctly, both instances run the same number of tasks > (12 each). That is all Streams promises. > > To come back to your initial question: > > Is there a way to tell kafka streams to uniformly assign partitions across > instances? If I have n kafka streams instances running, I want each to > handle EXACTLY 1/nth number of partitions. No dynamic task assignment > logic. Just dumb 1/n assignment. > > That is exactly what you get: each of you two instances get 24/2 = 12 > tasks assigned. That is dump 1/n assignment, isn't it? So my original > response was correct. > > However, I now understand better what you are actually meaning by your > question. Note that Streams does not distinguish "type" of tasks -- it > only sees 24 tasks and assigns those in a balanced way. > > Thus, currently there is no easy way to get the assignment you want to > have, except, you implement you own `PartitionAssignor`. > > This is the current implementation for 0.10.2 > https://github.com/apache/kafka/blob/
Re: more uniform task assignment across kafka stream nodes
Awesome! Thanks. Can’t wait to try it out. Ara. On Mar 28, 2017, at 2:13 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 28, 2017 at 2:13:51 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Created a JIRA: https://issues.apache.org/jira/browse/KAFKA-4969 -Matthias On 3/27/17 4:33 PM, Ara Ebrahimi wrote: Well, even with 4-5x better performance thanks to the session window fix, I expect to get ~10x better performance if I throw 10x more nodes at the problem. That won’t be the case due to task assignment unfortunately. I may end up with say 5-6 nodes with aggregation assigned to them and 4-5 nodes sitting there doing nothing. So it is a problem. Ara. On Mar 27, 2017, at 4:15 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 4:15:07 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> Great! So overall, the issue is not related to task assignment. Also the description below, does not indicate that different task assignment would change anything. -Matthias On 3/27/17 3:08 PM, Ara Ebrahimi wrote: Let me clarify, cause I think we’re using different terminologies: - message key is phone number, reversed - all call records for a phone number land on the same partition - then we apply a session window on them and aggregate+reduce - so we end up with a group of records for a phone number. This group is literally an avro object with an array of records in it for the session. - since records arrive chronologically and since we have a session window, then all call records for a phone number end up in the same partition and session (intended behavior). We can easily have many such phone call record groups with 100s of call records in them. The aggregate object (the avro object with array of records in it) can get updated 100s of times for the same phone number in the course of an hour or so. - we process billions of such call records a day - we can’t expect our clients to install massive clusters of 10s or 100s of nodes. We do need to make sure this processing is as efficient as possible. The session window bug was killing us. Much better with the fix Damian provided! Ara. On Mar 27, 2017, at 2:41 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io><mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 2:41:06 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> Ara, I assume your performance issue is most likely related to the fix Damian pointed out already. Couple o
Re: more uniform task assignment across kafka stream nodes
Well, even with 4-5x better performance thanks to the session window fix, I expect to get ~10x better performance if I throw 10x more nodes at the problem. That won’t be the case due to task assignment unfortunately. I may end up with say 5-6 nodes with aggregation assigned to them and 4-5 nodes sitting there doing nothing. So it is a problem. Ara. On Mar 27, 2017, at 4:15 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 4:15:07 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Great! So overall, the issue is not related to task assignment. Also the description below, does not indicate that different task assignment would change anything. -Matthias On 3/27/17 3:08 PM, Ara Ebrahimi wrote: Let me clarify, cause I think we’re using different terminologies: - message key is phone number, reversed - all call records for a phone number land on the same partition - then we apply a session window on them and aggregate+reduce - so we end up with a group of records for a phone number. This group is literally an avro object with an array of records in it for the session. - since records arrive chronologically and since we have a session window, then all call records for a phone number end up in the same partition and session (intended behavior). We can easily have many such phone call record groups with 100s of call records in them. The aggregate object (the avro object with array of records in it) can get updated 100s of times for the same phone number in the course of an hour or so. - we process billions of such call records a day - we can’t expect our clients to install massive clusters of 10s or 100s of nodes. We do need to make sure this processing is as efficient as possible. The session window bug was killing us. Much better with the fix Damian provided! Ara. On Mar 27, 2017, at 2:41 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 2:41:06 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> Ara, I assume your performance issue is most likely related to the fix Damian pointed out already. Couple of follow up comments: critical part of our pipeline involves grouping relevant records together Can you explain this a little better? The abstraction of a task does group data together already. Furthermore, if you get multiple tasks, those are independent units with regard to grouping as consecutive tasks are "connected" via repartitioning steps. Thus, even if we apply the task assignment as you ask for, I am not sure if this would change anything? Maybe you can give a small data example what behavior/data-co-location you need and what Streams provides for you. And for hot keys this can lead to sometimes 100s of records to get grouped together This is independent of task placement -- it a partitioning issue. If you want to work on that, you can provide a custom `Partioner` for the used Kafka producers (note, your external Producer writing to your Streams input topic might already generate "hot" keys, so you might need to use a custom partitioner there, too) Also "100s of records" does not sound much to me. Streams can process multiple hundredths of thousandth records per thread. That is the reason, why I think that the fix Damian pointed out will most likely fix your problem. -Matthias On 3/27/17 1:56 PM, Ara Ebrahimi wrote: Thanks for the response Mathias! The reason
Re: more uniform task assignment across kafka stream nodes
Let me clarify, cause I think we’re using different terminologies: - message key is phone number, reversed - all call records for a phone number land on the same partition - then we apply a session window on them and aggregate+reduce - so we end up with a group of records for a phone number. This group is literally an avro object with an array of records in it for the session. - since records arrive chronologically and since we have a session window, then all call records for a phone number end up in the same partition and session (intended behavior). We can easily have many such phone call record groups with 100s of call records in them. The aggregate object (the avro object with array of records in it) can get updated 100s of times for the same phone number in the course of an hour or so. - we process billions of such call records a day - we can’t expect our clients to install massive clusters of 10s or 100s of nodes. We do need to make sure this processing is as efficient as possible. The session window bug was killing us. Much better with the fix Damian provided! Ara. On Mar 27, 2017, at 2:41 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 2:41:06 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Ara, I assume your performance issue is most likely related to the fix Damian pointed out already. Couple of follow up comments: critical part of our pipeline involves grouping relevant records together Can you explain this a little better? The abstraction of a task does group data together already. Furthermore, if you get multiple tasks, those are independent units with regard to grouping as consecutive tasks are "connected" via repartitioning steps. Thus, even if we apply the task assignment as you ask for, I am not sure if this would change anything? Maybe you can give a small data example what behavior/data-co-location you need and what Streams provides for you. And for hot keys this can lead to sometimes 100s of records to get grouped together This is independent of task placement -- it a partitioning issue. If you want to work on that, you can provide a custom `Partioner` for the used Kafka producers (note, your external Producer writing to your Streams input topic might already generate "hot" keys, so you might need to use a custom partitioner there, too) Also "100s of records" does not sound much to me. Streams can process multiple hundredths of thousandth records per thread. That is the reason, why I think that the fix Damian pointed out will most likely fix your problem. -Matthias On 3/27/17 1:56 PM, Ara Ebrahimi wrote: Thanks for the response Mathias! The reason we want this exact task assignment to happen is that a critical part of our pipeline involves grouping relevant records together (that’s what the aggregate function in the topology is for). And for hot keys this can lead to sometimes 100s of records to get grouped together. Even worse, these records are session bound, we use session windows. Hence we see lots of activity around the store backing the aggregate function and even though we use SSD drives we’re not seeing the kind of performance we want to see. It seems like the aggregate function leads to lots of updates to these hot keys which lead to lots of rocksdb activity. Now there are many ways to fix this problem: - just don’t aggregate, create an algorithm which is not reliant on grouping/aggregating records. Not what we can do with our tight schedule right now. - do grouping/aggregating but employ n instances and rely on uniform distribution of these tasks. This is the easiest solution and what we expected to work but didn’t work as you can tell from this thread. We threw 4 instances at it but only 2 got used. - tune rocksdb? I tried this actually but it didn’t really help us much, aside from the fact that tuning rocksdb is very tricky. - use in-memory store instead? Unfortunately we have to use session windows for this aggregate function and apparently there’s no in-memory session store impl? I tried to create one but soon realized it’s too much work :) I looked at default PartitionAssigner code too, but that ain’t trivial either. So I’m a bit hopeless :( Ara. On Mar 27, 2017
Re: more uniform task assignment across kafka stream nodes
Holy s...! This lead to ~4-5x better performance. I’m gonna try this with more nodes and if performance improves almost linearly then we are good for now. Thanks! Ara. > On Mar 27, 2017, at 2:10 PM, Damian Guy <damian@gmail.com> wrote: > > Hi Ara, > > There is a performance issue in the 0.10.2 release of session windows. It > is fixed with this PR: https://github.com/apache/kafka/pull/2645 > You can work around this on 0.10.2 by calling the aggregate(..), reduce(..) > etc methods and supplying StateStoreSupplier with caching > disabled, i.e, by doing something like: > > final StateStoreSupplier sessionStore = > Stores.create(*"session-store-name"*) >.withKeys(Serdes.String()) >.withValues(Serdes.String()) >.persistent() >.sessionWindowed(TimeUnit.MINUTES.toMillis(7)) >.build(); > > > The fix has also been cherry-picked to the 0.10.2 branch, so you could > build from source and not have to create the StateStoreSupplier. > > Thanks, > Damian > > On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > > Thanks for the response Mathias! > > The reason we want this exact task assignment to happen is that a critical > part of our pipeline involves grouping relevant records together (that’s > what the aggregate function in the topology is for). And for hot keys this > can lead to sometimes 100s of records to get grouped together. Even worse, > these records are session bound, we use session windows. Hence we see lots > of activity around the store backing the aggregate function and even though > we use SSD drives we’re not seeing the kind of performance we want to see. > It seems like the aggregate function leads to lots of updates to these hot > keys which lead to lots of rocksdb activity. > > Now there are many ways to fix this problem: > - just don’t aggregate, create an algorithm which is not reliant on > grouping/aggregating records. Not what we can do with our tight schedule > right now. > - do grouping/aggregating but employ n instances and rely on uniform > distribution of these tasks. This is the easiest solution and what we > expected to work but didn’t work as you can tell from this thread. We threw > 4 instances at it but only 2 got used. > - tune rocksdb? I tried this actually but it didn’t really help us much, > aside from the fact that tuning rocksdb is very tricky. > - use in-memory store instead? Unfortunately we have to use session windows > for this aggregate function and apparently there’s no in-memory session > store impl? I tried to create one but soon realized it’s too much work :) I > looked at default PartitionAssigner code too, but that ain’t trivial either. > > So I’m a bit hopeless :( > > Ara. > > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io matth...@confluent.io>> wrote: > > > > > > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > > > From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io >>> > Subject: Re: more uniform task assignment across kafka stream nodes > Date: March 27, 2017 at 1:35:30 PM PDT > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> > > > Ara, > > thanks for the detailed information. > > If I parse this correctly, both instances run the same number of tasks > (12 each). That is all Streams promises. > > To come back to your initial question: > > Is there a way to tell kafka streams to uniformly assign partitions across > instances? If I have n kafka streams instances running, I want each to > handle EXACTLY 1/nth number of partitions. No dynamic task assignment > logic. Just dumb 1/n assignment. > > That is exactly what you get: each of you two instances get 24/2 = 12 > tasks assigned. That is dump 1/n assignment, isn't it? So my original > response was correct. > > However, I now understand better what you are actually meaning by your > question. Note that Streams does not distinguish "type" of tasks -- it > only sees 24 tasks and assigns those in a balanced way. > > Thus, currently there is no easy way to get the assignment you want to > have, except, you implement you own `PartitionAssignor`. > > This is the current implementation for 0.10.2 >
Re: more uniform task assignment across kafka stream nodes
Thanks for the response Mathias! The reason we want this exact task assignment to happen is that a critical part of our pipeline involves grouping relevant records together (that’s what the aggregate function in the topology is for). And for hot keys this can lead to sometimes 100s of records to get grouped together. Even worse, these records are session bound, we use session windows. Hence we see lots of activity around the store backing the aggregate function and even though we use SSD drives we’re not seeing the kind of performance we want to see. It seems like the aggregate function leads to lots of updates to these hot keys which lead to lots of rocksdb activity. Now there are many ways to fix this problem: - just don’t aggregate, create an algorithm which is not reliant on grouping/aggregating records. Not what we can do with our tight schedule right now. - do grouping/aggregating but employ n instances and rely on uniform distribution of these tasks. This is the easiest solution and what we expected to work but didn’t work as you can tell from this thread. We threw 4 instances at it but only 2 got used. - tune rocksdb? I tried this actually but it didn’t really help us much, aside from the fact that tuning rocksdb is very tricky. - use in-memory store instead? Unfortunately we have to use session windows for this aggregate function and apparently there’s no in-memory session store impl? I tried to create one but soon realized it’s too much work :) I looked at default PartitionAssigner code too, but that ain’t trivial either. So I’m a bit hopeless :( Ara. On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 27, 2017 at 1:35:30 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Ara, thanks for the detailed information. If I parse this correctly, both instances run the same number of tasks (12 each). That is all Streams promises. To come back to your initial question: Is there a way to tell kafka streams to uniformly assign partitions across instances? If I have n kafka streams instances running, I want each to handle EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 1/n assignment. That is exactly what you get: each of you two instances get 24/2 = 12 tasks assigned. That is dump 1/n assignment, isn't it? So my original response was correct. However, I now understand better what you are actually meaning by your question. Note that Streams does not distinguish "type" of tasks -- it only sees 24 tasks and assigns those in a balanced way. Thus, currently there is no easy way to get the assignment you want to have, except, you implement you own `PartitionAssignor`. This is the current implementation for 0.10.2 https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java You can, if you wish write your own assignor and set it via StreamsConfig. However, be aware that this might be tricky to get right and also might have runtime implications with regard to rebalancing and state store recovery. We recently improve the current implementation to avoid costly task movements: https://issues.apache.org/jira/browse/KAFKA-4677 Thus, I would not recommend to implement an own `PartitionAssignor`. However, the root question is, why do you need this exact assignment you are looking for in the first place? Why is it "bad" if "types" of tasks are not distinguished? I would like to understand your requirement better -- it might be worth to improve Streams here. -Matthias On 3/27/17 12:57 PM, Ara Ebrahimi wrote: Hi, So, I simplified the topology by making sure we have only 1 source topic. Now I have 1 source topic, 8 partitions, 2 instances. And here’s how the topology looks like: instance 1: KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac StreamsThread appId: mar-23-modular StreamsThread clientId: mar-23-modular StreamsThread threadId: StreamThread-1 Active tasks: StreamsTask taskId: 0_3 ProcessorTopology: KSTREAM-SOURCE-00: topics: [activities-avro-or] children: [KSTREAM-FILTER-01] KSTREAM-FILTER-01: children: [KSTREAM-MAP-02] KSTREAM-MAP-02: children: [KS
Re: more uniform task assignment across kafka stream nodes
: StreamsThread appId: mar-23-modular StreamsThread clientId: mar-23-modular StreamsThread threadId: StreamThread-8 Active tasks: StreamsTask taskId: 0_5 ProcessorTopology: KSTREAM-SOURCE-00: topics: [activities-avro-or] children: [KSTREAM-FILTER-01] KSTREAM-FILTER-01: children: [KSTREAM-MAP-02] KSTREAM-MAP-02: children: [KSTREAM-BRANCH-03] KSTREAM-BRANCH-03: children: [KSTREAM-BRANCHCHILD-04, KSTREAM-BRANCHCHILD-05] KSTREAM-BRANCHCHILD-04: children: [KSTREAM-MAPVALUES-06] KSTREAM-MAPVALUES-06: children: [KSTREAM-FLATMAPVALUES-07] KSTREAM-FLATMAPVALUES-07: children: [KSTREAM-MAP-08] KSTREAM-MAP-08: children: [KSTREAM-FILTER-11] KSTREAM-FILTER-11: children: [KSTREAM-SINK-10] KSTREAM-SINK-10: topic: activities-by-phone-store-or-repartition KSTREAM-BRANCHCHILD-05: Partitions [activities-avro-or-5] Standby tasks: activities-avro-or is input topic. ml-features-avro-or is output topic. In the middle we have an aggregate (activities-by-phone-store-or-repartition). On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see 5. Bad. On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. Good. On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And 3 on instance 2. Bad. As I said in terms of offsets for all these partitions I see uniform distribution, so we’re not dealing with a bad key scenario. Ara. On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 25, 2017 at 6:43:12 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Please share the rest of your topology code (without any UDFs / business logic). Otherwise, I cannot give further advice. -Matthias On 3/25/17 6:08 PM, Ara Ebrahimi wrote: Via: builder.stream("topic1"); builder.stream("topic2"); builder.stream("topic3”); These are different kinds of topics consuming different avro objects. Ara. On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 25, 2017 at 6:04:30 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> Ara, How do you consume your topics? Via builder.stream("topic1", "topic2", "topic3); or via builder.stream("topic1"); builder.stream("topic2"); builder.stream("topic3"); Both and handled differently with regard to creating tasks (partition to task assignment also depends on you downstream code though). If this does not help, can you maybe share the structure of processing? To dig deeper, we would need to know the topology DAG. -Matthias On 3/25/17 5:56 PM, Ara Ebrahimi wrote: Mathias, This apparently happens because we have more than 1 source topic. We have 3 source topics in the same application. So it seems like the task assignment algorithm creates topologies not for one specific topic at a time but the total partitions across all source topics consumed in an application instance. Because we have some code dependencies between these 3 source topics we can’t separate them into 3 applications at this time. Hence the reason I want to get the task assignment algorithm basically do a uniform and simple task assignment PER source topic. Ara. On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io<
Re: more uniform task assignment across kafka stream nodes
Via: builder.stream("topic1"); builder.stream("topic2"); builder.stream("topic3”); These are different kinds of topics consuming different avro objects. Ara. On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 25, 2017 at 6:04:30 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Ara, How do you consume your topics? Via builder.stream("topic1", "topic2", "topic3); or via builder.stream("topic1"); builder.stream("topic2"); builder.stream("topic3"); Both and handled differently with regard to creating tasks (partition to task assignment also depends on you downstream code though). If this does not help, can you maybe share the structure of processing? To dig deeper, we would need to know the topology DAG. -Matthias On 3/25/17 5:56 PM, Ara Ebrahimi wrote: Mathias, This apparently happens because we have more than 1 source topic. We have 3 source topics in the same application. So it seems like the task assignment algorithm creates topologies not for one specific topic at a time but the total partitions across all source topics consumed in an application instance. Because we have some code dependencies between these 3 source topics we can’t separate them into 3 applications at this time. Hence the reason I want to get the task assignment algorithm basically do a uniform and simple task assignment PER source topic. Ara. On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io><mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 25, 2017 at 5:21:47 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>> Hi, I am wondering why this happens in the first place. Streams, load-balanced over all running instances, and each instance should be the same number of tasks (and thus partitions) assigned. What is the overall assignment? Do you have StandyBy tasks configured? What version do you use? -Matthias On 3/24/17 8:09 PM, Ara Ebrahimi wrote: Hi, Is there a way to tell kafka streams to uniformly assign partitions across instances? If I have n kafka streams instances running, I want each to handle EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 1/n assignment. Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We also have 2 kafka streams instances. Each instances get assigned to handle 4 “source" topic partitions. BUT then we do a few maps and an aggregate. So data gets shuffled around. The map function uniformly distributes these across all partitions (I can verify that by looking at the partition offsets). After the map what I notice by looking at the topology is that one kafka streams instance get assigned to handle say 2 aggregate repartition topics and the other one gets assigned 6. Even worse, on bigger clusters (say 4 instances) we see say 2 nodes gets assigned downstream aggregate repartition topics and 2 other nodes assigned NOTHING to handle. Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. _
Re: more uniform task assignment across kafka stream nodes
Mathias, This apparently happens because we have more than 1 source topic. We have 3 source topics in the same application. So it seems like the task assignment algorithm creates topologies not for one specific topic at a time but the total partitions across all source topics consumed in an application instance. Because we have some code dependencies between these 3 source topics we can’t separate them into 3 applications at this time. Hence the reason I want to get the task assignment algorithm basically do a uniform and simple task assignment PER source topic. Ara. On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matth...@confluent.io<mailto:matth...@confluent.io>> wrote: This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. From: "Matthias J. Sax" <matth...@confluent.io<mailto:matth...@confluent.io>> Subject: Re: more uniform task assignment across kafka stream nodes Date: March 25, 2017 at 5:21:47 PM PDT To: users@kafka.apache.org<mailto:users@kafka.apache.org> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>> Hi, I am wondering why this happens in the first place. Streams, load-balanced over all running instances, and each instance should be the same number of tasks (and thus partitions) assigned. What is the overall assignment? Do you have StandyBy tasks configured? What version do you use? -Matthias On 3/24/17 8:09 PM, Ara Ebrahimi wrote: Hi, Is there a way to tell kafka streams to uniformly assign partitions across instances? If I have n kafka streams instances running, I want each to handle EXACTLY 1/nth number of partitions. No dynamic task assignment logic. Just dumb 1/n assignment. Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We also have 2 kafka streams instances. Each instances get assigned to handle 4 “source" topic partitions. BUT then we do a few maps and an aggregate. So data gets shuffled around. The map function uniformly distributes these across all partitions (I can verify that by looking at the partition offsets). After the map what I notice by looking at the topology is that one kafka streams instance get assigned to handle say 2 aggregate repartition topics and the other one gets assigned 6. Even worse, on bigger clusters (say 4 instances) we see say 2 nodes gets assigned downstream aggregate repartition topics and 2 other nodes assigned NOTHING to handle. Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: kafka streams locking issue in 0.10.20.0
Hi, Thanks for the reply. Let me give you more information: - we have a group by + aggregate. The hopping time window is 10 minutes but the maintainMs is 180 days (we’re trying to reprocess the entire data set of billions of records). - Over time, after just a few hours, the aggregate slows down considerably. Initially it keeps up with the ingest rate just fine, but then after a few hours we see that a counter we have in the body of the aggregate block stops incrementing completely and resumes after a few minutes! It seems to me it’s trying to roll the rocksdb files and pauses the whole StreamThread? Is that possible? Is that the root cause of this issue? What’s the best way to monitor rocksdb rolling? Can’t find much in the LOG file. I don’t see any stalling. - If above case is possible then what’s the solution? Much smaller maintainMs? Or tuning rocksdb? This is on SSD. Our records have windowed keys of 28 bytes and 200 bytes average value size. Ara. > On Feb 24, 2017, at 3:15 AM, Damian Guy <damian@gmail.com> wrote: > > Hi Ara, > > This usually means that one, or more, of your StreamThreads is taking a > long time during recovery or processing. So the rebalance times out and you > get another rebalance. > The second exception, i.e., > 2017-02-22 20:12:48 WARN StreamThread:1184 - Could not create task 3_0. > Will retry. > org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock > the state directory: /kafka/1/kafka-streams/argyle-streams/3_0 > > Is because you have at least one thread that is still processing and hasn't > given up its tasks that have been revoked yet. > You probably need to get a thread dump to see what the threads are doing at > the time. As the first exception suggests, you may need to increase the > max.poll.interval.ms > > Thanks, > Damian > > On Thu, 23 Feb 2017 at 18:26 Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> After upgrading to 0.10.20.0 I got this: >> >> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit >> cannot be completed since the group has already rebalanced and assigned the >> partitions to another member. This means that the time between subsequent >> calls to poll() was longer than the configured max.poll.interval.ms, >> which typically implies that the poll loop is spending too much time >> message processing. You can address this either by increasing the session >> timeout or by reducing the maximum size of batches returned in poll() with >> max.poll.records. >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480) >> ... 10 more >> >> Which lead to this: >> >> 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread >> [StreamThread-9] Failed while executing StreamTask 4_6 due to commit >> consumer offsets: >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be >> completed since the group has already rebalanced and assigned the >> partitions to another member. This means that the time between subsequent >> calls to poll() was longer than the configured max.poll.interval.ms, >> which typically implies that the poll loop is spending too much time >> message processing. You can address this either by increasing the session >> timeout or by reducing the maximum size of batches returned in poll() with >> max.poll.records. >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) >> at >> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) >> at >> org.a
kafka streams locking issue in 0.10.20.0
Hi, After upgrading to 0.10.20.0 I got this: Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480) ... 10 more Which lead to this: 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread [StreamThread-9] Failed while executing StreamTask 4_6 due to commit consumer offsets: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480) at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group argyle-streams failed on partition revocation org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-9] failed to suspend stream tasks at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488) at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at
KTable TTL
Hi, I have a ktable and I want to keep entries in it only for that past 24 hours. How can I do that? I understand rocksdb has support for ttl. Should I set that? How? Should I use kafka-streams window functionality? Would it remove data from old windows? I want to do this because I’m seeing a huge lag forming in the repartition topic backing the ktable after a while and the whole thing slows down. In jmx I see values such as these for rocksdb metrics: all-get-avg-latency-ms=60123 all-put-avg-latency-ms=40456 all-put-qps=421 all-get-qps=430 What’s the best way to check to see how rocksdb is performing and where the bottleneck is? This is on an ssd. Thanks, Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: kafka-consumer-offset-checker complaining about NoNode for X in zk
Thanks! Worked like a charm. For some partitions, which do have data, I see “unknown” reported as offset. Any idea what unknown means? Also what’s the new command for setting offsets? Specifically move it back to point 0, AND also to move it to the end. Ara. > On Feb 2, 2017, at 11:21 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > > Hi, > > sorry and using the consumer group tool, instead of the offset checker > > > On 02.02.2017 20:08, Jan Filipiak wrote: >> Hi, >> >> if its a kafka stream app, its most likely going to store its offsets >> in kafka rather than zookeeper. >> You can use the --new-consumer option to check for kafka stored offsets. >> >> Best Jan >> >> >> On 01.02.2017 21:14, Ara Ebrahimi wrote: >>> Hi, >>> >>> For a subset of our topics we get this error: >>> >>> $KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group >>> argyle-streams --topic topic_name --zookeeper $ZOOKEEPERS >>> [2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is >>> deprecated and will be dropped in releases following 0.9.0. Use >>> ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) >>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: >>> KeeperErrorCode = NoNode for >>> /consumers/streams-app/offsets/topic_name/2. >>> >>> All topics are created via the same process but a few of them report >>> this error. >>> >>> Why I check in zk there’s noting under /consumers: >>> >>> zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers >>> >>> WATCHER:: >>> >>> WatchedEvent state:SyncConnected type:None path:null >>> [] >>> >>> But the kafka-consumer-offset-checker does work for many topics >>> anyway, and it fails for a few. >>> >>> Why does this happen? How can I fix it? >>> >>> Thanks, >>> Ara. >>> >>> >>> >>> >>> >>> This message is for the designated recipient only and may contain >>> privileged, proprietary, or otherwise confidential information. If >>> you have received it in error, please notify the sender immediately >>> and delete the original. Any other use of the e-mail by you is >>> prohibited. Thank you in advance for your cooperation. >>> >>> >> > > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka-consumer-offset-checker complaining about NoNode for X in zk
Hi, For a subset of our topics we get this error: $KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group argyle-streams --topic topic_name --zookeeper $ZOOKEEPERS [2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/streams-app/offsets/topic_name/2. All topics are created via the same process but a few of them report this error. Why I check in zk there’s noting under /consumers: zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers WATCHER:: WatchedEvent state:SyncConnected type:None path:null [] But the kafka-consumer-offset-checker does work for many topics anyway, and it fails for a few. Why does this happen? How can I fix it? Thanks, Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka-streams ktable recovery after rebalance crash
Hi, My kafka-streams application crashed due to a rebalance event (seems like I need to increase max.poll.interval.ms even more!) and then when I restarted the app I noticed existing rocksdb files were gone and while the rest of the pipeline was processing the part dealing with ktable was sitting there doing nothing for minutes, util it crashed again complaining about this: 2017-02-01 11:48:35 ERROR StreamPipeline:160 - An exception has occurred. Shutting down the pipeline... org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-10] Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task [2_2] Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:89) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) ... 1 more Caused by: java.io.IOException: task [2_2] Failed to lock the state directory: /kafka/1/kafka-streams/streams-app/2_2 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:101) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69) ... 13 more It seems to be it was trying to recreate the rocksdb files from scratch, right? How can I fix this situation and make it recover? Which topic is used for recreating rocksdb files? The changelog or repartition? How can I know how much progress has been made recreating these files? Which offsets are meaningful? Thanks, Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: kafka streams consumer partition assignment is uneven
I meant I have 7 topics and each has 12 partitions. Considering that I have 4 streaming threads per node, I was expecting to see each thread process 1 partition from each topics and 7 partitions total per streaming thread. But that’s not the case. Or perhaps you are saying the number of streaming threads should follow the total number of partitions across all 7 topics?! Ara. > On Jan 9, 2017, at 11:48 AM, Michael Noll <mich...@confluent.io> wrote: > > What does the processing topology of your Kafka Streams application look > like, and what's the exact topic and partition configuration? You say you > have 12 partitions in your cluster, presumably across 7 topics -- that > means that most topics have just a single partition. Depending on your > topology (e.g. if you have defined that single-partition topics A, B, C > must be joined), Kafka Streams is forced to let one of your three Streams > nodes process "more" topics/partitions than the other two nodes. > > -Michael > > > > On Mon, Jan 9, 2017 at 6:52 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3 >> kafka streams nodes. Each is configured to have 4 streaming threads. My >> topology is quite complex and I have 7 topics and lots of joins and states. >> >> What I have noticed is that each of the 3 kafka streams nodes gets >> configured to process variables number of partitions of a topic. One node >> is assigned to process 2 partitions of topic a and another one gets >> assigned 5. Hence I end up with nonuniform throughput across these nodes. >> One node ends up processing more data than the other. >> >> What’s going on? How can I make sure partitions assignment to kafka >> streams nodes is uniform? >> >> On a similar topic, is there a way to make sure partition assignment to >> disks across kafka brokers is also uniform? Even if I use a round-robin one >> to pin partitions to broker, but there doesn’t seem to be a way to >> uniformly pin partitions to disks. Or maybe I’m missing something here? I >> end up with 2 partitions of topic a on disk 1 and 3 partitions of topic a >> on disk 2. It’s a bit variable. Not totally random, but it’s not uniformly >> distributed either. >> >> Ara. >> >> >> >> >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> >> > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka streams consumer partition assignment is uneven
Hi, I have 3 kafka brokers, each with 4 disks. I have 12 partitions. I have 3 kafka streams nodes. Each is configured to have 4 streaming threads. My topology is quite complex and I have 7 topics and lots of joins and states. What I have noticed is that each of the 3 kafka streams nodes gets configured to process variables number of partitions of a topic. One node is assigned to process 2 partitions of topic a and another one gets assigned 5. Hence I end up with nonuniform throughput across these nodes. One node ends up processing more data than the other. What’s going on? How can I make sure partitions assignment to kafka streams nodes is uniform? On a similar topic, is there a way to make sure partition assignment to disks across kafka brokers is also uniform? Even if I use a round-robin one to pin partitions to broker, but there doesn’t seem to be a way to uniformly pin partitions to disks. Or maybe I’m missing something here? I end up with 2 partitions of topic a on disk 1 and 3 partitions of topic a on disk 2. It’s a bit variable. Not totally random, but it’s not uniformly distributed either. Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka streams passes org.apache.kafka.streams.kstream.internals.Change to my app!!
Hi, Once in a while and quite randomly this happens, but it does happen every few hundred thousand message: 2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread [StreamThread-4] Streams application error during processing: java.lang.ClassCastException: org.apache.kafka.streams.kstream.internals.Change cannot be cast to com.argyledata.streams.entity.Activity at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown Source) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112) at org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) Has anyone else seen this weird problem? Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: Initializing StateStores takes *really* long for large datasets
+1 on this. Ara. > On Nov 30, 2016, at 5:18 AM, Mathieu Fenniak> wrote: > > I'd like to quickly reinforce Frank's opinion regarding the rocksdb memory > usage. I was also surprised by the amount of non-JVM-heap memory being > used and had to tune the 100 MB default down considerably. It's also > unfortunate that it's hard to estimate the memory requirements for a KS app > because of this. If you have ten stores, and assuming the default config, > you'd need a GB of memory for the rocksdb cache if you run 1 app, but only > half a GB if you run two app instances because the stores will be > distributed. > > It would be much nicer to be able to give KS a fixed amount of memory in a > config that it divided among the active stores on a node. Configure it > with N GB; if a rebalance adds more tasks and stores, they each get less > RAM; if a rebalance removes tasks and stores, the remaining stores get more > RAM. It seems like it'd be hard to do this with the RocksDBConfigSetter > interface because it doesn't get any state about the KS topology to make > decisions; which are arguably not config, but tuning / performance > decisions. > > Mathieu > > > > On Mon, Nov 28, 2016 at 3:45 PM, Frank Lyaruu wrote: > >> I'll write an update on where I am now. >> >> I've got about 40 'primary' topics, some small, some up to about 10M >> messages, >> and about 30 internal topics, divided over 6 stream instances, all running >> in a single >> app, talking to a 3 node Kafka cluster. >> >> I use a single thread per stream instance, as my prime concern is now to >> get it >> to run stable, rather than optimizing performance. >> >> My biggest issue was that after a few hours my application started to slow >> down >> to ultimately freeze up or crash. It turned out that RocksDb consumed all >> my >> memory, which I overlooked as it was off-heap. >> >> I was fooling around with RocksDb settings a bit but I had missed the most >> important >> one: >> >>BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); >>tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); >>tableConfig.setBlockSize(BLOCK_SIZE); >>options.setTableFormatConfig(tableConfig); >> >> The block cache size defaults to a whopping 100Mb per store, and that gets >> expensive >> fast. I reduced it to a few megabytes. My data size is so big that I doubt >> it is very effective >> anyway. Now it seems more stable. >> >> I'd say that a smaller default makes sense, especially because the failure >> case is >> so opaque (running all tests just fine but with a serious dataset it dies >> slowly) >> >> Another thing I see is that while starting all my instances, some are quick >> and some take >> time (makes sense as the data size varies greatly), but as more instances >> start up, they >> start to use more and more CPU I/O and network, that the initialization of >> the bigger ones >> takes even longer, increasing the chance that one of them takes longer than >> the >> MAX_POLL_INTERVAL_MS_CONFIG, and then all hell breaks loose. Maybe we can >> separate the 'initialize' and 'start' step somehow. >> >> In this case we could log better: If initialization is taking longer than >> the timeout, it ends up >> being reassigned (in my case to the same instance) and then it errors out >> on being unable >> to lock the state dir. That message isn't too informative as the timeout is >> the actual problem. >> >> regards, Frank >> >> >> On Mon, Nov 28, 2016 at 8:01 PM, Guozhang Wang wrote: >> >>> Hello Frank, >>> >>> How many instances do you have in your apps and how many threads did you >>> use per thread? Note that besides the topology complexity (i.e. number of >>> state stores, number of internal topics etc) the (re-)initialization >>> process is depending on the underlying consumer's membership protocol, >> and >>> hence its rebalance latency could be longer with larger groups. >>> >>> We have been optimizing our rebalance latency due to state store >> migration >>> and restoration in the latest release, but for now the re-initialization >>> latency is still largely depends on 1) topology complexity regarding to >>> state stores and 2) number of input topic partitions and instance / >> threads >>> in the application. >>> >>> >>> Guozhang >>> >>> >>> On Sat, Nov 26, 2016 at 12:57 AM, Damian Guy >> wrote: >>> Hi Frank, If you are running on a single node then the RocksDB state should be re-used by your app. However, it relies on the app being cleanly >> shutdown and the existence of ".checkpoint" files in the state directory for the store, .i.e, /tmp/kafka-streams/application-id/0_0/.checkpoint. If the file doesn't exist then the entire state will be restored from the >> changelog - which could take some time. I suspect this is what is happening? As for the RocksDB memory settings, yes the off
Re: Hang while close() on KafkaStream
This happens for me too. On 10.1.0. Seems like it just sits there waiting in the streamThread.join() call. Ara. > On Nov 17, 2016, at 6:02 PM, mordac2kwrote: > > Hello all, > > I have a Java application in which I use an instance of KafkaStreams that > is working splendidly, under normal circumstances. > > However, during development testing, I am using a single kafka instance > (0.10.1.0) as the broker. When this single broker is not running, sending > SIGTERM to my application causes it to hang on streams.close(). > > This hang appears to be indefinite, it does not appear to ever time out. > Funny enough, if, during this hang, the broker comes back online, the close > will continue and the application will terminate properly as a result of > the original SIGTERM. > > I've tried this with broker configurations where the partition count is set > to 1 and 2, with the same result in both scenarios. > > (FWIW: I scanned the archive of this ML looking for previous instances of > this issue, and did not find one concerning streams specifically) > > Any thoughts? > > Thanks. > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: kafka streams rocksdb tuning (or memory leak?)
This indeed was the issue and cp2 did fix it! Thank you! Ara. > On Nov 16, 2016, at 4:11 PM, Damian Guy <damian@gmail.com> wrote: > > Hi Ara, > > Are you running Kafka Streams v0.10.1? If so, there is a resource leak in > the window store. See: https://github.com/apache/kafka/pull/2122 > You can either try checking out the Apache Kafka 0.10.1 branch (which has > the fix) and building it manually, or you can try using the latest > confluent packaged version of the library, which you can get from > confluents maven repository: > > > >confluent >http://packages.confluent.io/maven/ > > > >org.apache.kafka >kafka-streams >0.10.1.0-cp2 >org.apache.kafka >kafka-clients > 0.10.1.0-cp2 > > Thanks, > > Damian > > > On Wed, 16 Nov 2016 at 14:52 Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> I have a few KTables in my application. Some of them have unlimited >> windows. If I leave the application to run for a few hours, I see the java >> process consume more and more memory, way above the -Xmx limit. I >> understand this is due to the rocksdb native lib used by kafka streams. >> What I don’t understand is how I can control how much rocksdb data should >> be kept in memory. I know I can use the RocksDBConfigSetter class, but I >> need more information on why this unbounded memory allocation happens and >> how to tune it. >> >> Thanks, >> Ara. >> >> >> >> >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> >> > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka streams rocksdb tuning (or memory leak?)
Hi, I have a few KTables in my application. Some of them have unlimited windows. If I leave the application to run for a few hours, I see the java process consume more and more memory, way above the -Xmx limit. I understand this is due to the rocksdb native lib used by kafka streams. What I don’t understand is how I can control how much rocksdb data should be kept in memory. I know I can use the RocksDBConfigSetter class, but I need more information on why this unbounded memory allocation happens and how to tune it. Thanks, Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: kafka streaming rocks db lock bug?
This was in 10.1.0. What happened was that a kafka broker went down and then this happened on the kafka streaming instance which had connection to this broker. I can send you all logs I got. Ara. On Oct 24, 2016, at 10:41 PM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Hello Ara, Your encountered issue seems to be KAFKA-3812 <https://issues.apache.org/jira/browse/KAFKA-3812>, and KAFKA-3938 <https://issues.apache.org/jira/browse/KAFKA-3938>. Could you try to upgrade to the newly released 0.10.1.0 version and see if this issue goes away? If not I would love to investigate this issue further with you. Guozhang Guozhang On Sun, Oct 23, 2016 at 1:45 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: And then this on a different node: 2016-10-23 13:43:57 INFO StreamThread:286 - stream-thread [StreamThread-3] Stream thread shutdown complete 2016-10-23 13:43:57 ERROR StreamPipeline:169 - An exception has occurred org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-3] Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( StreamThread.java:401) at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:235) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.( AbstractTask.java:72) at org.apache.kafka.streams.processor.internals. StreamTask.(StreamTask.java:90) at org.apache.kafka.streams.processor.internals. StreamThread.createStreamTask(StreamThread.java:622) at org.apache.kafka.streams.processor.internals. StreamThread.addStreamTasks(StreamThread.java:649) at org.apache.kafka.streams.processor.internals.StreamThread.access$000( StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1. onPartitionsAssigned(StreamThread.java:120) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator. ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer. pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll( KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop( StreamThread.java:398) ... 1 more Caused by: java.io.IOException: task [7_1] Failed to lock the state directory: /tmp/kafka-streams/argyle-streams/7_1 at org.apache.kafka.streams.processor.internals. ProcessorStateManager.(ProcessorStateManager.java:98) at org.apache.kafka.streams.processor.internals.AbstractTask.( AbstractTask.java:69) ... 13 more Ara. On Oct 23, 2016, at 1:24 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>< mailto:ara.ebrah...@argyledata.com>> wrote: Hi, This happens when I hammer our 5 kafka streaming nodes (each with 4 streaming threads) hard enough for an hour or so: 2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread [StreamThread-2] Failed to flush state for StreamTask 3_8: org.apache.kafka.streams.errors.ProcessorStateException: task [3_8] Failed to flush state store streams-data-record-stats-avro-br-store at org.apache.kafka.streams.processor.internals. ProcessorStateManager.flush(ProcessorStateManager.java:322) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState( AbstractTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread$4.apply( StreamThread.java:360) at org.apache.kafka.streams.processor.internals.StreamThread. performOnAllTasks(StreamThread.java:322) at org.apache.kafka.streams.processor.internals. StreamThread.flushAllState(StreamThread.java:357) at org.apache.kafka.streams.processor.internals.StreamThread. shutdownTasksAndState(StreamThread.java:295) at org.apache.kafka.streams.processor.internals.StreamThread.shutdown( StreamThread.java:262) at org.apache.kafka.streams.processor.internals. StreamThread.run(StreamThread.java:245) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store streams-data-record-stats-avro-br-store-20150516 at location /tmp/kafka-streams/argyle-streams/3_8/streams-data- record-stats-avro-br-store/streams-data-record-stats- avro-br-store-20150516 at org.apache.kafka.streams.state.internals.RocksDBStore. openDB(RocksDBStore.java:196) at org.apache.kafka.streams.state.internals.RocksDBStore. openDB(RocksDBStore.java:158) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment. openDB(RocksDBWindowStore.java:72) at org.apache.kafka.streams.
Re: kafka streaming rocks db lock bug?
And then this on a different node: 2016-10-23 13:43:57 INFO StreamThread:286 - stream-thread [StreamThread-3] Stream thread shutdown complete 2016-10-23 13:43:57 ERROR StreamPipeline:169 - An exception has occurred org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-3] Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:401) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:235) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:90) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:622) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:649) at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:120) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:398) ... 1 more Caused by: java.io.IOException: task [7_1] Failed to lock the state directory: /tmp/kafka-streams/argyle-streams/7_1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:98) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69) ... 13 more Ara. On Oct 23, 2016, at 1:24 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: Hi, This happens when I hammer our 5 kafka streaming nodes (each with 4 streaming threads) hard enough for an hour or so: 2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread [StreamThread-2] Failed to flush state for StreamTask 3_8: org.apache.kafka.streams.errors.ProcessorStateException: task [3_8] Failed to flush state store streams-data-record-stats-avro-br-store at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:322) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:360) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:357) at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:295) at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store streams-data-record-stats-avro-br-store-20150516 at location /tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putAndReturnInternalKey(RocksDBWindowStore.java:310) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:292) at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:101) at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:87) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:320) ... 7
kafka streaming rocks db lock bug?
Hi, This happens when I hammer our 5 kafka streaming nodes (each with 4 streaming threads) hard enough for an hour or so: 2016-10-23 13:04:17 ERROR StreamThread:324 - stream-thread [StreamThread-2] Failed to flush state for StreamTask 3_8: org.apache.kafka.streams.errors.ProcessorStateException: task [3_8] Failed to flush state store streams-data-record-stats-avro-br-store at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:322) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:181) at org.apache.kafka.streams.processor.internals.StreamThread$4.apply(StreamThread.java:360) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) at org.apache.kafka.streams.processor.internals.StreamThread.flushAllState(StreamThread.java:357) at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:295) at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store streams-data-record-stats-avro-br-store-20150516 at location /tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:196) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:158) at org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.openDB(RocksDBWindowStore.java:72) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.getOrCreateSegment(RocksDBWindowStore.java:402) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.putAndReturnInternalKey(RocksDBWindowStore.java:310) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:292) at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:101) at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:87) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:118) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:320) ... 7 more Caused by: org.rocksdb.RocksDBException: IO error: lock /tmp/kafka-streams/argyle-streams/3_8/streams-data-record-stats-avro-br-store/streams-data-record-stats-avro-br-store-20150516/LOCK: No locks available at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:184) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:189) ... 18 more Some sort of a locking bug? Note that when this happen this node stops processing anything and the other nodes seem to want to pick up the load, which brings the whole streaming cluster to a stand still. That’s very worrying. Is a document somewhere describing *in detail* how failover for streaming works? Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: micro-batching in kafka streams
Awesome! Thanks. Ara. On Sep 28, 2016, at 3:20 PM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Ara, I'd recommend you using the interactive queries feature, available in the up coming 0.10.1 in a couple of weeks, to query the current snapshot of the state store. We are going to write a blog post about step-by-step instructions to leverage this feature for use cases just like yours soon. Guozhang On Wed, Sep 28, 2016 at 2:19 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: I need this ReadOnlyKeyValueStore. In my use case, I do an aggregateByKey(), so a KTable is formed, backed by a state store. This is then used by the next steps of the pipeline. Now using the word count sample, I try to read the state store. Hence I end up sharing it with the actual pipeline. And Kafka Streams doesn’t like that because apparently state stores are assumed to be writable (indeed there’s a put() in there) and deep in the rocksdb state store code it tries to create a lock file (and indeed there is a filed named LOCK in there). Ara. On Sep 28, 2016, at 2:03 PM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com><mailto:wan gg...@gmail.com<mailto:gg...@gmail.com>>> wrote: Ara, Are you using the interactive queries feature but encountered issue due to locking file conflicts? https://cwiki.apache.org/confluence/display/KAFKA/KIP- 67%3A+Queryable+state+for+Kafka+Streams This is not expected to happen, if you are indeed using this feature I'd like to learn more of your error scenario. Guozhang On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com> <mailto:ara.ebrah...@argyledata.com>> wrote: One more thing: Guozhang pointed me towards this sample for micro-batching: https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313 07c1aef5a1/streams/examples/src/main/java/org/apache/ kafka/streams/examples/wordcount/WordCountProcessorDemo.java This is a good example and successfully got it adapted for my user case. BUT the main problem is that even if my use case deals with writing of hourly windows of data and hence the data is already in a rocksdb file but I need to create a duplicate of the same file just to be able to periodically do range scans on it and write to the external database. I did try to see if I could get StateStore to read the same rocksdb file used by the aggregateByKey which is happening before this step but it complained about not being able to lock the file. Would be great to be able to share the same underlying file between aggregateByKey (or any other such KTable-producing operation) and such periodic triggers. Ara. On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>< mailto:ara.ebrah...@argyledata.com>< mailto:ara.ebrah...@argyledata.com>> wrote: Hi, So, here’s the situation: - for classic batching of writes to external systems, right now I simply hack it. This specific case is writing of records to Accmumlo database, and I simply use the batch writer to batch writes, and it flushes every second or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit too. This is good enough for me, but obviously it’s not perfect. I wish Kafka Streams had some sort of a trigger (based on x number of records processed, or y window of time passed). Which brings me to the next use case. - I have some logic for calculating hourly statistics. So I’m dealing with Windowed data already. These stats then need to be written to an external database for use by user facing systems. Obviously I need to write the final result for each hourly window after we’re past that window of time (or I can write as often as it gets updated but the problem is that the external database is not as fast as Kafka). I do understand that I need to take into account the fact that events may arrive out of order and there may be some records arriving a little bit after I’ve considered the previous window over and have moved to the next one. I’d like to have some sort of an hourly trigger (not just pure x milliseconds trigger, but also support for cron style timing) and then also have the option to update the stats I’ve already written for a window a set amount of time after the trigger got triggered so that I can deal with events which arrive after the write for that window. And then there’s a cut-off point after which updating the stats for a very old window is just not worth it. Something like this DSL: kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* write */ } ); The tricky part is reconciling event source time and event processing time. Clearly t
Re: micro-batching in kafka streams
I need this ReadOnlyKeyValueStore. In my use case, I do an aggregateByKey(), so a KTable is formed, backed by a state store. This is then used by the next steps of the pipeline. Now using the word count sample, I try to read the state store. Hence I end up sharing it with the actual pipeline. And Kafka Streams doesn’t like that because apparently state stores are assumed to be writable (indeed there’s a put() in there) and deep in the rocksdb state store code it tries to create a lock file (and indeed there is a filed named LOCK in there). Ara. On Sep 28, 2016, at 2:03 PM, Guozhang Wang <wangg...@gmail.com<mailto:wangg...@gmail.com>> wrote: Ara, Are you using the interactive queries feature but encountered issue due to locking file conflicts? https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams This is not expected to happen, if you are indeed using this feature I'd like to learn more of your error scenario. Guozhang On Tue, Sep 27, 2016 at 9:41 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: One more thing: Guozhang pointed me towards this sample for micro-batching: https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe734313 07c1aef5a1/streams/examples/src/main/java/org/apache/ kafka/streams/examples/wordcount/WordCountProcessorDemo.java This is a good example and successfully got it adapted for my user case. BUT the main problem is that even if my use case deals with writing of hourly windows of data and hence the data is already in a rocksdb file but I need to create a duplicate of the same file just to be able to periodically do range scans on it and write to the external database. I did try to see if I could get StateStore to read the same rocksdb file used by the aggregateByKey which is happening before this step but it complained about not being able to lock the file. Would be great to be able to share the same underlying file between aggregateByKey (or any other such KTable-producing operation) and such periodic triggers. Ara. On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>< mailto:ara.ebrah...@argyledata.com>> wrote: Hi, So, here’s the situation: - for classic batching of writes to external systems, right now I simply hack it. This specific case is writing of records to Accmumlo database, and I simply use the batch writer to batch writes, and it flushes every second or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit too. This is good enough for me, but obviously it’s not perfect. I wish Kafka Streams had some sort of a trigger (based on x number of records processed, or y window of time passed). Which brings me to the next use case. - I have some logic for calculating hourly statistics. So I’m dealing with Windowed data already. These stats then need to be written to an external database for use by user facing systems. Obviously I need to write the final result for each hourly window after we’re past that window of time (or I can write as often as it gets updated but the problem is that the external database is not as fast as Kafka). I do understand that I need to take into account the fact that events may arrive out of order and there may be some records arriving a little bit after I’ve considered the previous window over and have moved to the next one. I’d like to have some sort of an hourly trigger (not just pure x milliseconds trigger, but also support for cron style timing) and then also have the option to update the stats I’ve already written for a window a set amount of time after the trigger got triggered so that I can deal with events which arrive after the write for that window. And then there’s a cut-off point after which updating the stats for a very old window is just not worth it. Something like this DSL: kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* write */ } ); The tricky part is reconciling event source time and event processing time. Clearly this trigger is in the event processing time whereas the data is in the event source time most probably. Something like that :) Ara. On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io<mailto:mich...@confluent.io>mailto:ich...@confluent.io>>> wrote: Ara, may I ask why you need to use micro-batching in the first place? Reason why I am asking: Typically, when people talk about micro-batching, they are refer to the way some originally batch-based stream processing tools "bolt on" real-time processing by making their batch sizes really small. Here, micro-batching belongs to the realm of the inner workings of the stream processing tool. Orthogonally to that, you have f
Re: micro-batching in kafka streams
One more thing: Guozhang pointed me towards this sample for micro-batching: https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java This is a good example and successfully got it adapted for my user case. BUT the main problem is that even if my use case deals with writing of hourly windows of data and hence the data is already in a rocksdb file but I need to create a duplicate of the same file just to be able to periodically do range scans on it and write to the external database. I did try to see if I could get StateStore to read the same rocksdb file used by the aggregateByKey which is happening before this step but it complained about not being able to lock the file. Would be great to be able to share the same underlying file between aggregateByKey (or any other such KTable-producing operation) and such periodic triggers. Ara. On Sep 26, 2016, at 10:40 AM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: Hi, So, here’s the situation: - for classic batching of writes to external systems, right now I simply hack it. This specific case is writing of records to Accmumlo database, and I simply use the batch writer to batch writes, and it flushes every second or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit too. This is good enough for me, but obviously it’s not perfect. I wish Kafka Streams had some sort of a trigger (based on x number of records processed, or y window of time passed). Which brings me to the next use case. - I have some logic for calculating hourly statistics. So I’m dealing with Windowed data already. These stats then need to be written to an external database for use by user facing systems. Obviously I need to write the final result for each hourly window after we’re past that window of time (or I can write as often as it gets updated but the problem is that the external database is not as fast as Kafka). I do understand that I need to take into account the fact that events may arrive out of order and there may be some records arriving a little bit after I’ve considered the previous window over and have moved to the next one. I’d like to have some sort of an hourly trigger (not just pure x milliseconds trigger, but also support for cron style timing) and then also have the option to update the stats I’ve already written for a window a set amount of time after the trigger got triggered so that I can deal with events which arrive after the write for that window. And then there’s a cut-off point after which updating the stats for a very old window is just not worth it. Something like this DSL: kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* write */ } ); The tricky part is reconciling event source time and event processing time. Clearly this trigger is in the event processing time whereas the data is in the event source time most probably. Something like that :) Ara. On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io<mailto:mich...@confluent.io>> wrote: Ara, may I ask why you need to use micro-batching in the first place? Reason why I am asking: Typically, when people talk about micro-batching, they are refer to the way some originally batch-based stream processing tools "bolt on" real-time processing by making their batch sizes really small. Here, micro-batching belongs to the realm of the inner workings of the stream processing tool. Orthogonally to that, you have features/operations such as windowing, triggers, etc. that -- unlike micro-batching -- allow you as the user of the stream processing tool to define which exact computation logic you need. Whether or not, say, windowing is or is not computed via micro-batching behind the scenes should (at least in an ideal world) be of no concern to the user. -Michael On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com<mailto:ara.ebrah...@argyledata.com>> wrote: Hi, What’s the best way to do micro-batching in Kafka Streams? Any plans for a built-in mechanism? Perhaps StateStore could act as the buffer? What exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used anywhere? http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is proh
Re: micro-batching in kafka streams
Hi, So, here’s the situation: - for classic batching of writes to external systems, right now I simply hack it. This specific case is writing of records to Accmumlo database, and I simply use the batch writer to batch writes, and it flushes every second or so. I’ve added a shutdown hook to the jvm to flush upon graceful exit too. This is good enough for me, but obviously it’s not perfect. I wish Kafka Streams had some sort of a trigger (based on x number of records processed, or y window of time passed). Which brings me to the next use case. - I have some logic for calculating hourly statistics. So I’m dealing with Windowed data already. These stats then need to be written to an external database for use by user facing systems. Obviously I need to write the final result for each hourly window after we’re past that window of time (or I can write as often as it gets updated but the problem is that the external database is not as fast as Kafka). I do understand that I need to take into account the fact that events may arrive out of order and there may be some records arriving a little bit after I’ve considered the previous window over and have moved to the next one. I’d like to have some sort of an hourly trigger (not just pure x milliseconds trigger, but also support for cron style timing) and then also have the option to update the stats I’ve already written for a window a set amount of time after the trigger got triggered so that I can deal with events which arrive after the write for that window. And then there’s a cut-off point after which updating the stats for a very old window is just not worth it. Something like this DSL: kstream.trigger(/* when to trigger */ Cron.of(“0 * * * *”), /* update every hour afterwards */ Hours.toMillis(1), /* discard changes older than this */ Hours.toMillis(24), /* lambda */ (windowStartTime, windowedKey, record) -> { /* write */ } ); The tricky part is reconciling event source time and event processing time. Clearly this trigger is in the event processing time whereas the data is in the event source time most probably. Something like that :) Ara. > On Sep 26, 2016, at 1:59 AM, Michael Noll <mich...@confluent.io> wrote: > > Ara, > > may I ask why you need to use micro-batching in the first place? > > Reason why I am asking: Typically, when people talk about micro-batching, > they are refer to the way some originally batch-based stream processing > tools "bolt on" real-time processing by making their batch sizes really > small. Here, micro-batching belongs to the realm of the inner workings of > the stream processing tool. > > Orthogonally to that, you have features/operations such as windowing, > triggers, etc. that -- unlike micro-batching -- allow you as the user of > the stream processing tool to define which exact computation logic you > need. Whether or not, say, windowing is or is not computed via > micro-batching behind the scenes should (at least in an ideal world) be of > no concern to the user. > > -Michael > > > > > > On Mon, Sep 5, 2016 at 9:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> What’s the best way to do micro-batching in Kafka Streams? Any plans for a >> built-in mechanism? Perhaps StateStore could act as the buffer? What >> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem >> to be used anywhere? >> >> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ >> >> Ara. >> >> >> >> >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> >> > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: micro-batching in kafka streams
Thanks. +1 on KIP-63 story. I need all of that :) Ara. > On Sep 11, 2016, at 8:19 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Hello Ara, > > On the processor API, users have the flexible to do micro-batching with > their own implementation patterns. For example, like you mentioned already: > > 1. Use a state store to bookkeep recently received records, and in > process() function simply put the record into the store. > 2. Use puncutate() function to periodically process the bookkept batch > store in the state by iterating over the state, and send results to the > downstream. > > You can find a simple example in WordCount demo: > > https://github.com/apache/kafka/blob/177b2d0bea76f270ec087ebe73431307c1aef5a1/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java > > Note that it does not bookkeep the original records as micro-batches, but > compute the running aggregate results. But the general coding pattern is > the same. > > On the higher-level streams DSL, there is a proposed KIP for using caching > for aggregate operators, as a manner for implicit "trigger" mechanism. This > is not exactly the same as micro-batching, but also acts as reducing IO > costs as well as data traffic: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams > > > Let me know if these references are helpful to you. > > Guozhang > > > > > > > On Mon, Sep 5, 2016 at 12:10 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com> > wrote: > >> Hi, >> >> What’s the best way to do micro-batching in Kafka Streams? Any plans for a >> built-in mechanism? Perhaps StateStore could act as the buffer? What >> exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem >> to be used anywhere? >> >> http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ >> >> Ara. >> >> >> >> >> >> This message is for the designated recipient only and may contain >> privileged, proprietary, or otherwise confidential information. If you have >> received it in error, please notify the sender immediately and delete the >> original. Any other use of the e-mail by you is prohibited. Thank you in >> advance for your cooperation. >> >> >> > > > > -- > -- Guozhang > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
Re: Performance issue with KafkaStreams
Hi Eno, Could you elaborate more on tuning Kafka Streaming applications? What are the relationships between partitions and num.stream.threads num.consumer.fetchers and other such parameters? On a single node setup with x partitions, what’s the best way to make sure these partitions are consumed and processed by kafka streams optimally? Ara. > On Sep 10, 2016, at 3:18 AM, Eno Thereskawrote: > > Hi Caleb, > > We have a benchmark that we run nightly to keep track of performance. The > numbers we have do indicate that consuming through streams is indeed slower > than just a pure consumer, however the performance difference is not as large > as you are observing. Would it be possible for you to run this benchmark in > your environment and report the numbers? The benchmark is included with Kafka > Streams and you run it like this: > > export INCLUDE_TEST_JARS=true; ./bin/kafka-run-class.sh > org.apache.kafka.streams.perf.SimpleBenchmark > > You'll need a Kafka broker to be running. The benchmark will report various > numbers, but one of them is the performance of the consumer and the > performance of streams reading. > > Thanks > Eno > >> On 9 Sep 2016, at 18:19, Caleb Welton wrote: >> >> Same in both cases: >> client.id=Test-Prototype >> application.id=test-prototype >> >> group.id=test-consumer-group >> bootstrap.servers=broker1:9092,broker2:9092zookeeper.connect=zk1:2181 >> replication.factor=2 >> >> auto.offset.reset=earliest >> >> >> On Friday, September 9, 2016 8:48 AM, Eno Thereska >> wrote: >> >> >> >> Hi Caleb, >> >> Could you share your Kafka Streams configuration (i.e., StreamsConfig >> properties you might have set before the test)? >> >> Thanks >> Eno >> >> >> On Thu, Sep 8, 2016 at 12:46 AM, Caleb Welton wrote: >> >>> I have a question with respect to the KafkaStreams API. >>> >>> I noticed during my prototyping work that my KafkaStreams application was >>> not able to keep up with the input on the stream so I dug into it a bit and >>> found that it was spending an inordinate amount of time in >>> org.apache.kafka.common.network.Seloctor.select(). Not exactly a shooting >>> gun itself, so I dropped the implementation down to a single processor >>> reading off a source. >>> >>> public class TestProcessor extends AbstractProcessor { >>> static long start = -1; >>> static long count = 0; >>> >>> @Override >>> public void process(String key, String value) { >>> if (start < 0) { >>> start = System.currentTimeMillis(); >>> } >>> count += 1; >>> if (count > 100) { >>> long end = System.currentTimeMillis(); >>> double time = (end-start)/1000.0; >>> System.out.printf("Processed %d records in %f seconds (%f >>> records/s)\n", count, time, count/time); >>> start = -1; >>> count = 0; >>> } >>> >>> } >>> >>> } >>> >>> ... >>> >>> >>> TopologyBuilder topologyBuilder = new TopologyBuilder(); >>> topologyBuilder >>> .addSource("SOURCE", stringDeserializer, StringDeserializer, >>> "input") >>> .addProcessor("PROCESS", TestProcessor::new, "SOURCE"); >>> >>> >>> >>> Which I then ran through the KafkaStreams API, and then repeated with >>> the KafkaConsumer API. >>> >>> Using the KafkaConsumer API: >>> Processed 101 records in 1.79 seconds (558659.776536 records/s) >>> Processed 101 records in 1.229000 seconds (813670.463792 records/s) >>> Processed 101 records in 1.106000 seconds (904160.036166 records/s) >>> Processed 101 records in 1.19 seconds (840336.974790 records/s) >>> >>> Using the KafkaStreams API: >>> Processed 101 records in 6.407000 seconds (156079.444358 records/s) >>> Processed 101 records in 5.256000 seconds (190258.942161 records/s) >>> Processed 101 records in 5.141000 seconds (194514.880373 records/s) >>> Processed 101 records in 5.111000 seconds (195656.622970 records/s) >>> >>> >>> The profile on the KafkaStreams consisted of: >>> >>> 89.2% org.apache.kafka.common.network.Selector.select() >>> 7.6% org.apache.kafka.clients.producer.internals. >>> ProduceRequestResult.await() >>> 0.8% org.apach.kafka.common.network.PlaintextTransportLayer.read() >>> >>> >>> Is a 5X performance difference between Kafka Consumer and the >>> KafkaStreams api expected? >>> >>> Are there specific things I can do to diagnose/tune the system? >>> >>> Thanks, >>> Caleb >>> > > > > > > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > This message is for
Re: enhancing KStream DSL
Ah works! Thanks! I was under the impression that these are sequentially chained using the DSL. Didn’t realize I can still use allRecords parallel to the branches. Ara. > On Sep 9, 2016, at 5:27 AM, Michael Noll <mich...@confluent.io> wrote: > > Oh, my bad. > > Updating the third predicate in `branch()` may not even be needed. > > You could simply do: > > KStream<String, CallRecord>[] branches = allRecords >.branch( >(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR > ecord.getCallCommType()), >(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe > cord.getCallCommType()) >// Any callRecords that aren't matching any of the two > predicates above will be dropped. >); > > This would give you two branched streams instead of three: > >KStream<String, CallRecord> voiceRecords = branches[0]; >KStream<String, CallRecord> dataRecords = branches[1]; >// No third branched stream like before. > > Then, to count "everything" (VOICE + DATA + everything else), simply reuse > the original `allRecords` stream. > > > > On Fri, Sep 9, 2016 at 2:23 PM, Michael Noll <mich...@confluent.io> wrote: > >> Ara, >> >> you have shared this code snippet: >> >>> allRecords.branch( >>> (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR >> ecord.getCallCommType()), >>> (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe >> cord.getCallCommType()), >>> (imsi, callRecord) -> true >>> ); >> >> The branch() operation partitions the allRecords KStream into three >> disjoint streams. >> >> I'd suggest the following. >> >> First, update the third predicate in your `branch()` step to be "everything >> but VOICE and DATA", i.e. the remainder of allRecords once VOICE and DATA >> records are removed: >> >> >>KStream<String, CallRecord>[] branches = allRecords >>.branch( >>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR >> ecord.getCallCommType()), >>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe >> cord.getCallCommType()), >>(imsi, callRecord) -> !(callRecord.getCallCommType(). >> equalsIgnoreCase("VOICE") || callRecord.getCallCommType().e >> qualsIgnoreCase("DATA")) >>); >> >> This would give you: >> >>KStream<String, CallRecord> voiceRecords = branches[0]; >>KStream<String, CallRecord> dataRecords = branches[1]; >>KStream<String, CallRecord> recordsThatAreNeitherVoiceNorData = >> branches[2]; >> >> Then, to count "everything" (VOICE + DATA + everything else), simply >> reuse the original `allRecords` stream. >> >> -Michael >> >> >> >> >> >> On Thu, Sep 8, 2016 at 10:20 PM, Ara Ebrahimi <ara.ebrah...@argyledata.com >>> wrote: >> >>> Let’s say I have this: >>> >>> >>> KStream<String, CallRecord>[] branches = allRecords >>>.branch( >>>(imsi, callRecord) -> "VOICE".equalsIgnoreCase(callR >>> ecord.getCallCommType()), >>>(imsi, callRecord) -> "DATA".equalsIgnoreCase(callRe >>> cord.getCallCommType()), >>>(imsi, callRecord) -> true >>>); >>> KStream<String, CallRecord> callRecords = branches[0]; >>> KStream<String, CallRecord> dataRecords = branches[1]; >>> KStream<String, CallRecord> callRecordCounter = branches[2]; >>> >>> callRecordCounter >>>.map((imsi, callRecord) -> new KeyValue<>("", "")) >>>.countByKey( >>>UnlimitedWindows.of("counter-window"), >>>stringSerde >>>) >>>.print(); >>> >>> Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 >>> if data is DATA. Branch 2 is supposed to get triggered regardless of type >>> all the type so that then I can count stuff for a time window. BUT the >>> problem is branch is implemented like this: >>> >>> private class KStreamBranchProcessor extends AbstractProcessor<K, V> { >>>@Override >>>public void process(K key, V value) { >>>for (int i = 0; i < predicates.length; i++) { >>>
enhancing KStream DSL
Let’s say I have this: KStream[] branches = allRecords .branch( (imsi, callRecord) -> "VOICE".equalsIgnoreCase(callRecord.getCallCommType()), (imsi, callRecord) -> "DATA".equalsIgnoreCase(callRecord.getCallCommType()), (imsi, callRecord) -> true ); KStream callRecords = branches[0]; KStream dataRecords = branches[1]; KStream callRecordCounter = branches[2]; callRecordCounter .map((imsi, callRecord) -> new KeyValue<>("", "")) .countByKey( UnlimitedWindows.of("counter-window"), stringSerde ) .print(); Here I has 3 branches. Branch 0 is triggered if data is VOICE, branch 1 if data is DATA. Branch 2 is supposed to get triggered regardless of type all the type so that then I can count stuff for a time window. BUT the problem is branch is implemented like this: private class KStreamBranchProcessor extends AbstractProcessor { @Override public void process(K key, V value) { for (int i = 0; i < predicates.length; i++) { if (predicates[i].test(key, value)) { // use forward with childIndex here and then break the loop // so that no record is going to be piped to multiple streams context().forward(key, value, i); break; } } } } Note the break. So the counter branch is never reached. I’d like to change the behavior of branch so that all predicates are checked and no break happens, in say a branchAll() method. What’s the easiest way to this functionality to the DSL? I tried process() but it doesn’t return KStream. Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
micro-batching in kafka streams
Hi, What’s the best way to do micro-batching in Kafka Streams? Any plans for a built-in mechanism? Perhaps StateStore could act as the buffer? What exactly are ProcessorContext.schedule()/punctuate() for? They don’t seem to be used anywhere? http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.
kafka streams: join 2 Windowed tables
Hi, Is joining 2 streams based on Windowed keys supposed to work? I have 2 KTables: - KTableevents: I process events and aggregate events that have a common criteria using aggregateByKey and UnlimitedWindows as window (for now) - KTable hourlyStats: I calculate some stats using aggregateByKey for hourly windows TimeWindows.of(“window name”, hourly) Since both use aggregateByKey() they are both KTables and both have Windowed keys. I need to leftJoin the first one (events) with the second one (hourlyStats) BUT I need to join event x which occurred at time t0 with hourlyStats of the t0 window. In other words I need to join using JoinWindows.of("JoinWindow").within(60 * 60 * 1000). Since both are KTables this is not possible. But if I turn them both into KStream's using toStream() then I can use the leftJoin() variants which supports JoinWindows. They would both be KStream . The problem is the join doesn’t really happen. No hourlyStats is actually found for any row of events. The TimestampExtractor for both is correct. So, Is joining 2 streams based on Windowed keys supposed to work? If not then how can I accomplish the above task? Thanks, Ara. This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation.