Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread Xiao
Linkedin Gabblin compaction tool is using Hive to perform the compaction. Does 
it mean Lumos is replaced?

Confused… 

On Mar 17, 2015, at 10:00 PM, Xiao  wrote:

> Hi, all, 
> 
> Do you know whether Linkedin plans to open source Lumos in the near future?
> 
> I found the answer from Qiao Lin’s post about replication from Oracle/mySQL 
> to Hadoop. 
> 
>   - https://engineering.linkedin.com/data-ingestion/gobblin-big-data-ease
> 
> At the source side, it can be DataBus-based or file based. 
> 
> At the target side, it is Lumos to rebuild the snapshots due to inability to 
> do an update/delete in Hadoop. 
> 
> The slides about Lumos:
>   http://www.slideshare.net/Hadoop_Summit/th-220p230-cramachandranv1
> The talk about Lumos: 
>   https://www.youtube.com/watch?v=AGlRjlrNDYk
> 
> Event publishing is different from database replication. Kafka is used for 
> change publishing or maybe also used for sending changes (recorded in files). 
> 
> Thanks, 
> 
> Xiao Li
> 
> On Mar 17, 2015, at 7:26 PM, Arya Ketan  wrote:
> 
>> AFAIK , linkedin uses databus to do the same. Aesop is built on top of
>> databus , extending its beautiful capabilities to mysql n hbase
>> On Mar 18, 2015 7:37 AM, "Xiao"  wrote:
>> 
>>> Hi, all,
>>> 
>>> Do you know how Linkedin team publishes changed rows in Oracle to Kafka? I
>>> believe they already knew the whole problem very well.
>>> 
>>> Using triggers? or directly parsing the log? or using any Oracle
>>> GoldenGate interfaces?
>>> 
>>> Any lesson or any standard message format? Could the Linkedin people share
>>> it with us? I believe it can help us a lot.
>>> 
>>> Thanks,
>>> 
>>> Xiao Li
>>> 
>>> 
>>> On Mar 17, 2015, at 12:26 PM, James Cheng  wrote:
>>> 
 This is a great set of projects!
 
 We should put this list of projects on a site somewhere so people can
>>> more easily see and refer to it. These aren't Kafka-specific, but most seem
>>> to be "MySQL CDC." Does anyone have a place where they can host a page?
>>> Preferably a wiki, so we can keep it up to date easily.
 
 -James
 
 On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey <
>>> hisham.mardam...@gmail.com> wrote:
 
> Pretty much a hijack / plug as well (=
> 
> https://github.com/mardambey/mypipe
> 
> "MySQL binary log consumer with the ability to act on changed rows and
> publish changes to different systems with emphasis on Apache Kafka."
> 
> Mypipe currently encodes events using Avro before pushing them into
>>> Kafka
> and is Avro schema repository aware. The project is young; and patches
>>> for
> improvements are appreciated (=
> 
> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan 
>>> wrote:
> 
>> Great work.
>> Sorry for kinda hijacking this thread, but I though that we had built
>> some-thing on mysql bin log event propagator and wanted to share it .
>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop
>>> ).
>> Its
>> a change propagation frame-work. It has relays which listens to bin
>>> logs of
>> Mysql, keeps track of SCNs  and has consumers which can then
>>> (transform/map
>> or interpret as is) the bin log-event to a destination. Consumers also
>>> keep
>> track of SCNs and a slow consumer can go back to a previous SCN if it
>>> wants
>> to re-listen to events  ( similar to kafka's consumer view ).
>> 
>> All the producers/consumers are extensible and you can write your own
>> custom consumer and feed off the data to it.
>> 
>> Common use-cases:
>> a) Archive mysql based data into say hbase
>> b) Move mysql based data to say a search store for serving reads.
>> 
>> It has a decent ( not an awesome :) ) console too which gives a nice
>>> human
>> readable view of where the producers and consumers are.
>> 
>> Current supported producers are mysql bin logs, hbase wall-edits.
>> 
>> 
>> Further insights/reviews/feature reqs/pull reqs/advices are all
>>> welcome.
>> 
>> --
>> Arya
>> 
>> Arya
>> 
>> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
>> wrote:
>> 
>>> Really really nice!
>>> 
>>> Thank you.
>>> 
>>> On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard <
>>> p...@spootnik.org
>>> 
>>> wrote:
 Hi kafka,
 
 I just wanted to mention I published a very simple project which can
 connect as MySQL replication client and stream replication events to
 kafka: https://github.com/pyr/sqlstream
 
 When you don't have control over an application, it can provide a
>> simple
 way of consolidating SQL data in kafka.
 
 This is an early release and there are a few caveats (mentionned in
>>> the
 README), mostly the poor partitioning which I'm going to evolve
>>> quickly
 and the reconnection strategy which doesn't try to keep track of
>>> binlog

Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread Xiao
Hi, all, 

Do you know whether Linkedin plans to open source Lumos in the near future?

I found the answer from Qiao Lin’s post about replication from Oracle/mySQL to 
Hadoop. 

- https://engineering.linkedin.com/data-ingestion/gobblin-big-data-ease

At the source side, it can be DataBus-based or file based. 

At the target side, it is Lumos to rebuild the snapshots due to inability to do 
an update/delete in Hadoop. 

The slides about Lumos:
http://www.slideshare.net/Hadoop_Summit/th-220p230-cramachandranv1
The talk about Lumos: 
https://www.youtube.com/watch?v=AGlRjlrNDYk

Event publishing is different from database replication. Kafka is used for 
change publishing or maybe also used for sending changes (recorded in files). 

Thanks, 

Xiao Li

On Mar 17, 2015, at 7:26 PM, Arya Ketan  wrote:

> AFAIK , linkedin uses databus to do the same. Aesop is built on top of
> databus , extending its beautiful capabilities to mysql n hbase
> On Mar 18, 2015 7:37 AM, "Xiao"  wrote:
> 
>> Hi, all,
>> 
>> Do you know how Linkedin team publishes changed rows in Oracle to Kafka? I
>> believe they already knew the whole problem very well.
>> 
>> Using triggers? or directly parsing the log? or using any Oracle
>> GoldenGate interfaces?
>> 
>> Any lesson or any standard message format? Could the Linkedin people share
>> it with us? I believe it can help us a lot.
>> 
>> Thanks,
>> 
>> Xiao Li
>> 
>> 
>> On Mar 17, 2015, at 12:26 PM, James Cheng  wrote:
>> 
>>> This is a great set of projects!
>>> 
>>> We should put this list of projects on a site somewhere so people can
>> more easily see and refer to it. These aren't Kafka-specific, but most seem
>> to be "MySQL CDC." Does anyone have a place where they can host a page?
>> Preferably a wiki, so we can keep it up to date easily.
>>> 
>>> -James
>>> 
>>> On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey <
>> hisham.mardam...@gmail.com> wrote:
>>> 
 Pretty much a hijack / plug as well (=
 
 https://github.com/mardambey/mypipe
 
 "MySQL binary log consumer with the ability to act on changed rows and
 publish changes to different systems with emphasis on Apache Kafka."
 
 Mypipe currently encodes events using Avro before pushing them into
>> Kafka
 and is Avro schema repository aware. The project is young; and patches
>> for
 improvements are appreciated (=
 
 On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan 
>> wrote:
 
> Great work.
> Sorry for kinda hijacking this thread, but I though that we had built
> some-thing on mysql bin log event propagator and wanted to share it .
> You guys can also look into Aesop ( https://github.com/Flipkart/aesop
>> ).
> Its
> a change propagation frame-work. It has relays which listens to bin
>> logs of
> Mysql, keeps track of SCNs  and has consumers which can then
>> (transform/map
> or interpret as is) the bin log-event to a destination. Consumers also
>> keep
> track of SCNs and a slow consumer can go back to a previous SCN if it
>> wants
> to re-listen to events  ( similar to kafka's consumer view ).
> 
> All the producers/consumers are extensible and you can write your own
> custom consumer and feed off the data to it.
> 
> Common use-cases:
> a) Archive mysql based data into say hbase
> b) Move mysql based data to say a search store for serving reads.
> 
> It has a decent ( not an awesome :) ) console too which gives a nice
>> human
> readable view of where the producers and consumers are.
> 
> Current supported producers are mysql bin logs, hbase wall-edits.
> 
> 
> Further insights/reviews/feature reqs/pull reqs/advices are all
>> welcome.
> 
> --
> Arya
> 
> Arya
> 
> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
> wrote:
> 
>> Really really nice!
>> 
>> Thank you.
>> 
>> On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard <
>> p...@spootnik.org
>> 
>> wrote:
>>> Hi kafka,
>>> 
>>> I just wanted to mention I published a very simple project which can
>>> connect as MySQL replication client and stream replication events to
>>> kafka: https://github.com/pyr/sqlstream
>>> 
>>> When you don't have control over an application, it can provide a
> simple
>>> way of consolidating SQL data in kafka.
>>> 
>>> This is an early release and there are a few caveats (mentionned in
>> the
>>> README), mostly the poor partitioning which I'm going to evolve
>> quickly
>>> and the reconnection strategy which doesn't try to keep track of
>> binlog
>>> position, other than that, it should work as advertised.
>>> 
>>> Cheers,
>>> - pyr
>> 
> 
 
 
 
 --
 Hisham Mardam-Bey
 http://hisham.cc/
>>> 
>> 
>> 



Re: No topic owner when using different assignment strategies

2015-03-17 Thread Jiangjie Qin
Yes, store info in zookeeper would work. In new consumer since the
coordinator will resides on the server side, this would be easily
detected. I’m not sure if it is still worth making this change on the old
consumer, though. Especially this is a backward incompatible change in a
sense that all the consumers need to be upgraded to make it work.
That said, I think this is a good point and should be taken into
consideration for the new consumer coordinator.

Jiangjie (Becket) Qin

On 3/17/15, 7:05 PM, "tao xiao"  wrote:

>The intention of this test is to check how kafka would behaves if two
>different assignment strategies are set in the same consumer group. In
>reality this would happen as we never know what configurations downstream
>consumers would use.
>
>What about we store the assignment strategy in zk and send out a warning
>to
>consumer when doing rebalancing? but at least I'd suggest we document
>somewhere to warn people not to use different assignment strategies for
>the
>same consumer group
>
>On Wed, Mar 18, 2015 at 8:28 AM, Xiao  wrote:
>
>> I think this is a usability issue. It might need an extra admin tool to
>> verify if all configuration settings are correct, even if the broker can
>> return an error message to the consumers.
>>
>> Thanks,
>>
>> Xiao Li
>>
>> On Mar 17, 2015, at 5:18 PM, Jiangjie Qin 
>> wrote:
>>
>> > The problem is the consumers are independent to each other. We purely
>> > depend on the same algorithm running on different consumers to achieve
>> > agreement on partition assignment. Breaking this assumption violates
>>the
>> > design in the first place.
>> >
>> > On 3/17/15, 4:13 PM, "Mayuresh Gharat" 
>> wrote:
>> >
>> >> Probably we should return an error response if you already have a
>> >> partition
>> >> assignment strategy inplace for a group and you try to use other
>> strategy.
>> >>
>> >> Thanks,
>> >>
>> >> Mayuresh
>> >>
>> >> On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin
>>> >
>> >> wrote:
>> >>
>> >>> Yeah, using different partition assignment algorithms in the same
>> >>> consumer
>> >>> group won¹t work. Is there a particular reason you want to do this?
>> >>>
>> >>> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
>> >>>
>>  This is the corrected zk result
>> 
>>  Here is the result from zk
>>  [zk: localhost:2181(CONNECTED) 0] get
>>  /consumers/test/owners/mm-benchmark-test/0
>> 
>>  Node does not exist: /consumers/test/owners/mm-benchmark-test/0
>> 
>>  [zk: localhost:2181(CONNECTED) 1] get
>>  /consumers/test/owners/mm-benchmark-test1/0
>> 
>>  test-localhost-1426605370072-904d6fba-0
>> 
>>  On Tue, Mar 17, 2015 at 11:30 PM, tao xiao 
>> >>> wrote:
>> 
>> > Hi team,
>> >
>> > I have two consumer instances with the same group id connecting to
>> >>> two
>> > different topics with 1 partition created for each. One consumer
>>uses
>> > partition.assignment.strategy=roundrobin and the other one uses
>> >>> default
>> > assignment strategy. Both consumers have 1 thread spawned
>>internally
>> >>> and
>> > connect kafka using createMessageStreamsByFilter.
>> > The consumer with roundrobin assignment strategy connected kafka
>> >>> first
>> > and
>> > had 2 topics assigned to itself and then I brought up another
>> >>> consumer
>> > that
>> > has default assignment strategy configured. I saw rebalancing
>> >>> happened
>> > in
>> > both consumers but at the end only one of the topics was assigned
>>to
>> >>> the
>> > consumer that is configured roundrobin assignment strategy and no
>> >>> topics
>> > were assigned to the other consumer. This led to one topic missing
>> >>> its
>> > owner.
>> >
>> > Here is the result from zk
>> > [zk: localhost:2181(CONNECTED) 0] get
>> > /consumers/test/owners/mm-benchmark-test/0
>> >
>> > Node does not exist:
>> > /consumers/test12345667f/owners/mm-benchmark-test/0
>> >
>> > [zk: localhost:2181(CONNECTED) 1] get
>> > /consumers/test/owners/mm-benchmark-test1/0
>> >
>> > test-localhost-1426605370072-904d6fba-0
>> >
>> > The kafka version I use is 0.8.2.1
>> >
>> > --
>> > Regards,
>> > Tao
>> >
>> 
>> 
>> 
>>  --
>>  Regards,
>>  Tao
>> >>>
>> >>>
>> >>
>> >>
>> >> --
>> >> -Regards,
>> >> Mayuresh R. Gharat
>> >> (862) 250-7125
>> >
>>
>>
>
>
>-- 
>Regards,
>Tao



Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread Arya Ketan
AFAIK , linkedin uses databus to do the same. Aesop is built on top of
databus , extending its beautiful capabilities to mysql n hbase
On Mar 18, 2015 7:37 AM, "Xiao"  wrote:

> Hi, all,
>
> Do you know how Linkedin team publishes changed rows in Oracle to Kafka? I
> believe they already knew the whole problem very well.
>
> Using triggers? or directly parsing the log? or using any Oracle
> GoldenGate interfaces?
>
> Any lesson or any standard message format? Could the Linkedin people share
> it with us? I believe it can help us a lot.
>
> Thanks,
>
> Xiao Li
>
>
> On Mar 17, 2015, at 12:26 PM, James Cheng  wrote:
>
> > This is a great set of projects!
> >
> > We should put this list of projects on a site somewhere so people can
> more easily see and refer to it. These aren't Kafka-specific, but most seem
> to be "MySQL CDC." Does anyone have a place where they can host a page?
> Preferably a wiki, so we can keep it up to date easily.
> >
> > -James
> >
> > On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey <
> hisham.mardam...@gmail.com> wrote:
> >
> >> Pretty much a hijack / plug as well (=
> >>
> >> https://github.com/mardambey/mypipe
> >>
> >> "MySQL binary log consumer with the ability to act on changed rows and
> >> publish changes to different systems with emphasis on Apache Kafka."
> >>
> >> Mypipe currently encodes events using Avro before pushing them into
> Kafka
> >> and is Avro schema repository aware. The project is young; and patches
> for
> >> improvements are appreciated (=
> >>
> >> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan 
> wrote:
> >>
> >>> Great work.
> >>> Sorry for kinda hijacking this thread, but I though that we had built
> >>> some-thing on mysql bin log event propagator and wanted to share it .
> >>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop
> ).
> >>> Its
> >>> a change propagation frame-work. It has relays which listens to bin
> logs of
> >>> Mysql, keeps track of SCNs  and has consumers which can then
> (transform/map
> >>> or interpret as is) the bin log-event to a destination. Consumers also
> keep
> >>> track of SCNs and a slow consumer can go back to a previous SCN if it
> wants
> >>> to re-listen to events  ( similar to kafka's consumer view ).
> >>>
> >>> All the producers/consumers are extensible and you can write your own
> >>> custom consumer and feed off the data to it.
> >>>
> >>> Common use-cases:
> >>> a) Archive mysql based data into say hbase
> >>> b) Move mysql based data to say a search store for serving reads.
> >>>
> >>> It has a decent ( not an awesome :) ) console too which gives a nice
> human
> >>> readable view of where the producers and consumers are.
> >>>
> >>> Current supported producers are mysql bin logs, hbase wall-edits.
> >>>
> >>>
> >>> Further insights/reviews/feature reqs/pull reqs/advices are all
> welcome.
> >>>
> >>> --
> >>> Arya
> >>>
> >>> Arya
> >>>
> >>> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
> >>> wrote:
> >>>
>  Really really nice!
> 
>  Thank you.
> 
>  On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard <
> p...@spootnik.org
> 
>  wrote:
> > Hi kafka,
> >
> > I just wanted to mention I published a very simple project which can
> > connect as MySQL replication client and stream replication events to
> > kafka: https://github.com/pyr/sqlstream
> >
> > When you don't have control over an application, it can provide a
> >>> simple
> > way of consolidating SQL data in kafka.
> >
> > This is an early release and there are a few caveats (mentionned in
> the
> > README), mostly the poor partitioning which I'm going to evolve
> quickly
> > and the reconnection strategy which doesn't try to keep track of
> binlog
> > position, other than that, it should work as advertised.
> >
> > Cheers,
> > - pyr
> 
> >>>
> >>
> >>
> >>
> >> --
> >> Hisham Mardam-Bey
> >> http://hisham.cc/
> >
>
>


Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread Xiao
Hi, all, 

Do you know how Linkedin team publishes changed rows in Oracle to Kafka? I 
believe they already knew the whole problem very well. 

Using triggers? or directly parsing the log? or using any Oracle GoldenGate 
interfaces?

Any lesson or any standard message format? Could the Linkedin people share it 
with us? I believe it can help us a lot. 

Thanks, 

Xiao Li


On Mar 17, 2015, at 12:26 PM, James Cheng  wrote:

> This is a great set of projects!
> 
> We should put this list of projects on a site somewhere so people can more 
> easily see and refer to it. These aren't Kafka-specific, but most seem to be 
> "MySQL CDC." Does anyone have a place where they can host a page? Preferably 
> a wiki, so we can keep it up to date easily.
> 
> -James
> 
> On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey  
> wrote:
> 
>> Pretty much a hijack / plug as well (=
>> 
>> https://github.com/mardambey/mypipe
>> 
>> "MySQL binary log consumer with the ability to act on changed rows and
>> publish changes to different systems with emphasis on Apache Kafka."
>> 
>> Mypipe currently encodes events using Avro before pushing them into Kafka
>> and is Avro schema repository aware. The project is young; and patches for
>> improvements are appreciated (=
>> 
>> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan  wrote:
>> 
>>> Great work.
>>> Sorry for kinda hijacking this thread, but I though that we had built
>>> some-thing on mysql bin log event propagator and wanted to share it .
>>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop).
>>> Its
>>> a change propagation frame-work. It has relays which listens to bin logs of
>>> Mysql, keeps track of SCNs  and has consumers which can then (transform/map
>>> or interpret as is) the bin log-event to a destination. Consumers also keep
>>> track of SCNs and a slow consumer can go back to a previous SCN if it wants
>>> to re-listen to events  ( similar to kafka's consumer view ).
>>> 
>>> All the producers/consumers are extensible and you can write your own
>>> custom consumer and feed off the data to it.
>>> 
>>> Common use-cases:
>>> a) Archive mysql based data into say hbase
>>> b) Move mysql based data to say a search store for serving reads.
>>> 
>>> It has a decent ( not an awesome :) ) console too which gives a nice human
>>> readable view of where the producers and consumers are.
>>> 
>>> Current supported producers are mysql bin logs, hbase wall-edits.
>>> 
>>> 
>>> Further insights/reviews/feature reqs/pull reqs/advices are all welcome.
>>> 
>>> --
>>> Arya
>>> 
>>> Arya
>>> 
>>> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
>>> wrote:
>>> 
 Really really nice!
 
 Thank you.
 
 On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard >>> 
 wrote:
> Hi kafka,
> 
> I just wanted to mention I published a very simple project which can
> connect as MySQL replication client and stream replication events to
> kafka: https://github.com/pyr/sqlstream
> 
> When you don't have control over an application, it can provide a
>>> simple
> way of consolidating SQL data in kafka.
> 
> This is an early release and there are a few caveats (mentionned in the
> README), mostly the poor partitioning which I'm going to evolve quickly
> and the reconnection strategy which doesn't try to keep track of binlog
> position, other than that, it should work as advertised.
> 
> Cheers,
> - pyr
 
>>> 
>> 
>> 
>> 
>> -- 
>> Hisham Mardam-Bey
>> http://hisham.cc/
> 



Re: No topic owner when using different assignment strategies

2015-03-17 Thread tao xiao
The intention of this test is to check how kafka would behaves if two
different assignment strategies are set in the same consumer group. In
reality this would happen as we never know what configurations downstream
consumers would use.

What about we store the assignment strategy in zk and send out a warning to
consumer when doing rebalancing? but at least I'd suggest we document
somewhere to warn people not to use different assignment strategies for the
same consumer group

On Wed, Mar 18, 2015 at 8:28 AM, Xiao  wrote:

> I think this is a usability issue. It might need an extra admin tool to
> verify if all configuration settings are correct, even if the broker can
> return an error message to the consumers.
>
> Thanks,
>
> Xiao Li
>
> On Mar 17, 2015, at 5:18 PM, Jiangjie Qin 
> wrote:
>
> > The problem is the consumers are independent to each other. We purely
> > depend on the same algorithm running on different consumers to achieve
> > agreement on partition assignment. Breaking this assumption violates the
> > design in the first place.
> >
> > On 3/17/15, 4:13 PM, "Mayuresh Gharat" 
> wrote:
> >
> >> Probably we should return an error response if you already have a
> >> partition
> >> assignment strategy inplace for a group and you try to use other
> strategy.
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin  >
> >> wrote:
> >>
> >>> Yeah, using different partition assignment algorithms in the same
> >>> consumer
> >>> group won¹t work. Is there a particular reason you want to do this?
> >>>
> >>> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
> >>>
>  This is the corrected zk result
> 
>  Here is the result from zk
>  [zk: localhost:2181(CONNECTED) 0] get
>  /consumers/test/owners/mm-benchmark-test/0
> 
>  Node does not exist: /consumers/test/owners/mm-benchmark-test/0
> 
>  [zk: localhost:2181(CONNECTED) 1] get
>  /consumers/test/owners/mm-benchmark-test1/0
> 
>  test-localhost-1426605370072-904d6fba-0
> 
>  On Tue, Mar 17, 2015 at 11:30 PM, tao xiao 
> >>> wrote:
> 
> > Hi team,
> >
> > I have two consumer instances with the same group id connecting to
> >>> two
> > different topics with 1 partition created for each. One consumer uses
> > partition.assignment.strategy=roundrobin and the other one uses
> >>> default
> > assignment strategy. Both consumers have 1 thread spawned internally
> >>> and
> > connect kafka using createMessageStreamsByFilter.
> > The consumer with roundrobin assignment strategy connected kafka
> >>> first
> > and
> > had 2 topics assigned to itself and then I brought up another
> >>> consumer
> > that
> > has default assignment strategy configured. I saw rebalancing
> >>> happened
> > in
> > both consumers but at the end only one of the topics was assigned to
> >>> the
> > consumer that is configured roundrobin assignment strategy and no
> >>> topics
> > were assigned to the other consumer. This led to one topic missing
> >>> its
> > owner.
> >
> > Here is the result from zk
> > [zk: localhost:2181(CONNECTED) 0] get
> > /consumers/test/owners/mm-benchmark-test/0
> >
> > Node does not exist:
> > /consumers/test12345667f/owners/mm-benchmark-test/0
> >
> > [zk: localhost:2181(CONNECTED) 1] get
> > /consumers/test/owners/mm-benchmark-test1/0
> >
> > test-localhost-1426605370072-904d6fba-0
> >
> > The kafka version I use is 0.8.2.1
> >
> > --
> > Regards,
> > Tao
> >
> 
> 
> 
>  --
>  Regards,
>  Tao
> >>>
> >>>
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >
>
>


-- 
Regards,
Tao


Re: No topic owner when using different assignment strategies

2015-03-17 Thread Xiao
I think this is a usability issue. It might need an extra admin tool to verify 
if all configuration settings are correct, even if the broker can return an 
error message to the consumers. 

Thanks, 

Xiao Li

On Mar 17, 2015, at 5:18 PM, Jiangjie Qin  wrote:

> The problem is the consumers are independent to each other. We purely
> depend on the same algorithm running on different consumers to achieve
> agreement on partition assignment. Breaking this assumption violates the
> design in the first place.
> 
> On 3/17/15, 4:13 PM, "Mayuresh Gharat"  wrote:
> 
>> Probably we should return an error response if you already have a
>> partition
>> assignment strategy inplace for a group and you try to use other strategy.
>> 
>> Thanks,
>> 
>> Mayuresh
>> 
>> On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin 
>> wrote:
>> 
>>> Yeah, using different partition assignment algorithms in the same
>>> consumer
>>> group won¹t work. Is there a particular reason you want to do this?
>>> 
>>> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
>>> 
 This is the corrected zk result
 
 Here is the result from zk
 [zk: localhost:2181(CONNECTED) 0] get
 /consumers/test/owners/mm-benchmark-test/0
 
 Node does not exist: /consumers/test/owners/mm-benchmark-test/0
 
 [zk: localhost:2181(CONNECTED) 1] get
 /consumers/test/owners/mm-benchmark-test1/0
 
 test-localhost-1426605370072-904d6fba-0
 
 On Tue, Mar 17, 2015 at 11:30 PM, tao xiao 
>>> wrote:
 
> Hi team,
> 
> I have two consumer instances with the same group id connecting to
>>> two
> different topics with 1 partition created for each. One consumer uses
> partition.assignment.strategy=roundrobin and the other one uses
>>> default
> assignment strategy. Both consumers have 1 thread spawned internally
>>> and
> connect kafka using createMessageStreamsByFilter.
> The consumer with roundrobin assignment strategy connected kafka
>>> first
> and
> had 2 topics assigned to itself and then I brought up another
>>> consumer
> that
> has default assignment strategy configured. I saw rebalancing
>>> happened
> in
> both consumers but at the end only one of the topics was assigned to
>>> the
> consumer that is configured roundrobin assignment strategy and no
>>> topics
> were assigned to the other consumer. This led to one topic missing
>>> its
> owner.
> 
> Here is the result from zk
> [zk: localhost:2181(CONNECTED) 0] get
> /consumers/test/owners/mm-benchmark-test/0
> 
> Node does not exist:
> /consumers/test12345667f/owners/mm-benchmark-test/0
> 
> [zk: localhost:2181(CONNECTED) 1] get
> /consumers/test/owners/mm-benchmark-test1/0
> 
> test-localhost-1426605370072-904d6fba-0
> 
> The kafka version I use is 0.8.2.1
> 
> --
> Regards,
> Tao
> 
 
 
 
 --
 Regards,
 Tao
>>> 
>>> 
>> 
>> 
>> -- 
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
> 



Re: No topic owner when using different assignment strategies

2015-03-17 Thread Jiangjie Qin
The problem is the consumers are independent to each other. We purely
depend on the same algorithm running on different consumers to achieve
agreement on partition assignment. Breaking this assumption violates the
design in the first place.

On 3/17/15, 4:13 PM, "Mayuresh Gharat"  wrote:

>Probably we should return an error response if you already have a
>partition
>assignment strategy inplace for a group and you try to use other strategy.
>
>Thanks,
>
>Mayuresh
>
>On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin 
>wrote:
>
>> Yeah, using different partition assignment algorithms in the same
>>consumer
>> group won¹t work. Is there a particular reason you want to do this?
>>
>> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
>>
>> >This is the corrected zk result
>> >
>> >Here is the result from zk
>> >[zk: localhost:2181(CONNECTED) 0] get
>> >/consumers/test/owners/mm-benchmark-test/0
>> >
>> >Node does not exist: /consumers/test/owners/mm-benchmark-test/0
>> >
>> >[zk: localhost:2181(CONNECTED) 1] get
>> >/consumers/test/owners/mm-benchmark-test1/0
>> >
>> >test-localhost-1426605370072-904d6fba-0
>> >
>> >On Tue, Mar 17, 2015 at 11:30 PM, tao xiao 
>>wrote:
>> >
>> >> Hi team,
>> >>
>> >> I have two consumer instances with the same group id connecting to
>>two
>> >> different topics with 1 partition created for each. One consumer uses
>> >> partition.assignment.strategy=roundrobin and the other one uses
>>default
>> >> assignment strategy. Both consumers have 1 thread spawned internally
>>and
>> >> connect kafka using createMessageStreamsByFilter.
>> >> The consumer with roundrobin assignment strategy connected kafka
>>first
>> >>and
>> >> had 2 topics assigned to itself and then I brought up another
>>consumer
>> >>that
>> >> has default assignment strategy configured. I saw rebalancing
>>happened
>> >>in
>> >> both consumers but at the end only one of the topics was assigned to
>>the
>> >> consumer that is configured roundrobin assignment strategy and no
>>topics
>> >> were assigned to the other consumer. This led to one topic missing
>>its
>> >> owner.
>> >>
>> >> Here is the result from zk
>> >> [zk: localhost:2181(CONNECTED) 0] get
>> >> /consumers/test/owners/mm-benchmark-test/0
>> >>
>> >> Node does not exist:
>> >> /consumers/test12345667f/owners/mm-benchmark-test/0
>> >>
>> >> [zk: localhost:2181(CONNECTED) 1] get
>> >> /consumers/test/owners/mm-benchmark-test1/0
>> >>
>> >> test-localhost-1426605370072-904d6fba-0
>> >>
>> >> The kafka version I use is 0.8.2.1
>> >>
>> >> --
>> >> Regards,
>> >> Tao
>> >>
>> >
>> >
>> >
>> >--
>> >Regards,
>> >Tao
>>
>>
>
>
>-- 
>-Regards,
>Mayuresh R. Gharat
>(862) 250-7125



Re: No topic owner when using different assignment strategies

2015-03-17 Thread Mayuresh Gharat
Probably we should return an error response if you already have a partition
assignment strategy inplace for a group and you try to use other strategy.

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 2:10 PM, Jiangjie Qin 
wrote:

> Yeah, using different partition assignment algorithms in the same consumer
> group won¹t work. Is there a particular reason you want to do this?
>
> On 3/17/15, 8:32 AM, "tao xiao"  wrote:
>
> >This is the corrected zk result
> >
> >Here is the result from zk
> >[zk: localhost:2181(CONNECTED) 0] get
> >/consumers/test/owners/mm-benchmark-test/0
> >
> >Node does not exist: /consumers/test/owners/mm-benchmark-test/0
> >
> >[zk: localhost:2181(CONNECTED) 1] get
> >/consumers/test/owners/mm-benchmark-test1/0
> >
> >test-localhost-1426605370072-904d6fba-0
> >
> >On Tue, Mar 17, 2015 at 11:30 PM, tao xiao  wrote:
> >
> >> Hi team,
> >>
> >> I have two consumer instances with the same group id connecting to two
> >> different topics with 1 partition created for each. One consumer uses
> >> partition.assignment.strategy=roundrobin and the other one uses default
> >> assignment strategy. Both consumers have 1 thread spawned internally and
> >> connect kafka using createMessageStreamsByFilter.
> >> The consumer with roundrobin assignment strategy connected kafka first
> >>and
> >> had 2 topics assigned to itself and then I brought up another consumer
> >>that
> >> has default assignment strategy configured. I saw rebalancing happened
> >>in
> >> both consumers but at the end only one of the topics was assigned to the
> >> consumer that is configured roundrobin assignment strategy and no topics
> >> were assigned to the other consumer. This led to one topic missing its
> >> owner.
> >>
> >> Here is the result from zk
> >> [zk: localhost:2181(CONNECTED) 0] get
> >> /consumers/test/owners/mm-benchmark-test/0
> >>
> >> Node does not exist:
> >> /consumers/test12345667f/owners/mm-benchmark-test/0
> >>
> >> [zk: localhost:2181(CONNECTED) 1] get
> >> /consumers/test/owners/mm-benchmark-test1/0
> >>
> >> test-localhost-1426605370072-904d6fba-0
> >>
> >> The kafka version I use is 0.8.2.1
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
We are trying to see what might have caused it.

We had some questions :
1) Is this reproducible? That way we can dig deep.


This looks interesting problem to solve and you might have caught a bug,
but we need to verify the root cause before filing a ticket.

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 2:10 PM, Zakee  wrote:

> > What version are you running ?
>
> Version 0.8.2.0
>
> > Your case is 2). But the only thing weird is your replica (broker 3) is
> > requesting for offset which is greater than the leaders log end offset.
>
>
> So what could be the cause?
>
> Thanks
> Zakee
>
>
>
> > On Mar 17, 2015, at 11:45 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > What version are you running ?
> >
> > The code for latest version says that :
> >
> > 1) if the log end offset of the replica is greater than the leaders log
> end
> > offset, the replicas offset will be reset to logEndOffset of the leader.
> >
> > 2) Else if the log end offset of the replica is smaller than the leaders
> > log end offset and its out of range, the replicas offset will be reset to
> > logStartOffset of the leader.
> >
> > Your case is 2). But the only thing weird is your replica (broker 3) is
> > requesting for offset which is greater than the leaders log end offset.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Mar 17, 2015 at 10:26 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com > wrote:
> >
> >> cool.
> >>
> >> On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:
> >>
> >>> Hi Mayuresh,
> >>>
> >>> The logs are already attached and are in reverse order starting
> backwards
> >>> from [2015-03-14 07:46:52,517] to the time when brokers were started.
> >>>
> >>> Thanks
> >>> Zakee
> >>>
> >>>
> >>>
>  On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
> >>> gharatmayures...@gmail.com> wrote:
> 
>  Hi Zakee,
> 
>  Thanks for the logs. Can you paste earlier logs from broker-3 up to :
> 
>  [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
>  offset 1754769769 for partition [Topic22kv,5] out of range; reset
>  offset to 1400864851 (kafka.server.ReplicaFetcherThread)
> 
>  That would help us figure out what was happening on this broker before
> >>> it
>  issued a replicaFetch request to broker-4.
> 
>  Thanks,
> 
>  Mayuresh
> 
>  On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
> 
> > Hi Mayuresh,
> >
> > Here are the logs.
> >
> > 
> > Old School Yearbook Pics
> > View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >
> >>>
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
> >
> >
> > Thanks,
> > Kazim Zakee
> >
> >
> >
> >> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >>
> >> Can you provide more logs (complete) on Broker 3 till time :
> >>
> >> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4],
> Replica 3
> > for
> >> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> >>> current
> >> leader 4's start offset 1400864851
> (kafka.server.ReplicaFetcherThread)
> >>
> >> I would like to see logs from time much before it sent the fetch
> >>> request
> > to
> >> Broker 4 to the time above. I want to check if in any case Broker 3
> >>> was a
> >> leader before broker 4 took over.
> >>
> >> Additional logs will help.
> >>
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >>
> >>
> >> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
> >>
> >>> log.cleanup.policy is delete not compact.
> >>> log.cleaner.enable=true
> >>> log.cleaner.threads=5
> >>> log.cleanup.policy=delete
> >>> log.flush.scheduler.interval.ms=3000
> >>> log.retention.minutes=1440
> >>> log.segment.bytes=1073741824  (1gb)
> >>>
> >>> Messages are keyed but not compressed, producer async and uses
> kafka
> >>> default partitioner.
> >>> String message = msg.getString();
> >>> String uniqKey = ""+rnd.nextInt();// random key
> >>> String partKey = getPartitionKey();// partition key
> >>> KeyedMessage data = new KeyedMessage >>> String>(this.topicName, uniqKey, partKey, message);
> >>> producer.send(data);
> >>>
> >>> Thanks
> >>> Zakee
> >>>
> >>>
> >>>
>  On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> 
>  Is your topic log compacted? Also if it is are the messages keyed?
> >>> Or
> >>> are the messages compressed?
> 
>  Thanks,
> 
>  Mayuresh
> 
>  Sent from my iPhone
> 
> > On Mar 14, 2015, at 2:02 PM, Zakee  >>> kzak...@netzero.net>> wrote:
> >
> > Thanks, Jiangjie for helping resolve 

Re: Monitoring of consumer group lag

2015-03-17 Thread Otis Gospodnetic
Mathias,

SPM for Kafka will give you Consumer Offsets by Host, Consumer Id, Topic,
and Partition, and you can alert (thresholds and/or anomalies) on any
combination of these, and of course on any of the other 100+ Kafka metrics
there.
See http://blog.sematext.com/2015/02/10/kafka-0-8-2-monitoring/

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Tue, Mar 17, 2015 at 5:36 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:

> Hi Lance,
>
> I tried Kafka Offset Monitor a while back, but it didn't play especially
> nice with a lot of topics / partitions (we currently have around 1400
> topics and 4000 partitions in total). Might be possible to make it work a
> bit better, but not sure it would be the best way to do alerting.
>
> Thanks for the tip though :).
>
> Best regards,
> Mathias
>
>
> On Mon, 16 Mar 2015 at 21:02 Lance Laursen 
> wrote:
>
> > Hey Mathias,
> >
> > Kafka Offset Monitor will give you a general idea of where your consumer
> > group(s) are at:
> >
> > http://quantifind.com/KafkaOffsetMonitor/
> >
> > However, I'm not sure how useful it will be with "a large number of
> topics"
> > / turning its output into a script that alerts upon a threshold. Could
> take
> > a look and see what they're doing though.
> >
> > On Mon, Mar 16, 2015 at 8:31 AM, Mathias Söderberg <
> > mathias.soederb...@gmail.com> wrote:
> >
> > > Good day,
> > >
> > > I'm looking into using SimpleConsumer#getOffsetsBefore and offsets
> > > committed in ZooKeeper for monitoring the lag of a consumer group.
> > >
> > > Our current use case is that we have a service that is continuously
> > > consuming messages of a large number of topics and persisting the
> > messages
> > > to S3 at somewhat regular intervals (depends on time and the total size
> > of
> > > consumed messages for each partition). Offsets are committed to
> ZooKeeper
> > > after the messages have been persisted to S3.
> > > The partitions are of varying load, so a simple threshold based on the
> > > number of messages we're lagging behind would be cumbersome to maintain
> > due
> > > to the number of topics, and most likely prone to unnecessary alerts.
> > >
> > > Currently our broker configuration specifies log.roll.hours=1 and
> > > log.segment.bytes=1GB, and my proposed solution is to have a separate
> > > service that would iterate through all topics/partitions and use
> > > #getOffsetsBefore with a timestamp that is one (1) or two (2) hours ago
> > and
> > > compare the first offset (which from my testing looks to be the offset
> > that
> > > is closest in time, i.e. from the log segment that is closest to the
> > > timestamp given) with the one that is saved to ZooKeeper.
> > > It feels like a pretty solid solution, given that we just want a rough
> > > estimate of how much we're lagging behind in time, so that we know
> > (again,
> > > roughly) how much time we have to fix whatever is broken before the log
> > > segments are deleted by Kafka.
> > >
> > > Is there anyone doing monitoring similar to this? Are there any obvious
> > > downsides of this approach that I'm not thinking about? Thoughts on
> > > alternatives?
> > >
> > > Best regards,
> > > Mathias
> > >
> >
>


Fw: How to measure performance of Mirror Maker

2015-03-17 Thread Saladi Naidu
Any suggestions on how to measure throughput of the Mirror Maker Naidu Saladi 

- Forwarded Message -
  From: Saladi Naidu 
 To: "users@kafka.apache.org"  
 Sent: Monday, March 16, 2015 10:31 PM
 Subject: How to measure performance of Mirror Maker
   
We have three Kafka clusters deployed in 3 DC's, each one having their own 
topics. We are using Mirror Maker to keep all the three clusters up to date 
with continuous  replication using Mirror Maker. We used Perf producer and Perf 
consumer to conduct basic testing, all seems to work great.
Our 2 DC's are in US, 50 miles a part well connected by MAN fiber, our 3rd DC 
is in Europe, which is connected with dedicated links to 2 DC's in US, round 
trip latency is around 300 ms. 
When we ran performance test, our client node where perf test scripts ran are 
in US DC's, I have observed very low through put (almost 1/100th),  I 
understand some part is due to  NW latency. Same will be applied to Mirror 
Maker as well right? 
Are there any tools to measure Mirror Maker performance in general and 
specifically in remote DC?    Naidu Saladi 


  

Kafka deployment across DC

2015-03-17 Thread Shrikant Patel
When I sent the email last time the message formatting was messed. Hopefully 
this does not have happen this time.

We have very unique problem.
We have an application deployed on WebLogic cluster that is spread across 2 
datacenter (active-active) DC1 and DC2 (different LAN but same WAN). This 
producer app generates different user events, which other apps (consumer apps) 
are interested in. Right now we use JMS as event storage\messaging. JMS would 
not scale to leverage all of the events (right now we only pump small 
percentage of events through JMS), so we want to move Kafka. This nicely fits 
for our needs.
Our current deploy and guideline from network team complicates this very simple 
use case.
1.Our network team mandates that any deployment be in both DC1 and DC2. So 
if DC1 is unavailable we switch to DC2. From all the reading I have done, 
having kafka cluster separate across 2 DC is not recommended, also zookeeper 
cluster over the wan is going to be slow.

So thought is have 2 independent Kafka cluster one in each DC. Then user Mirror 
Maker (MM) to copy the message from topic on DC1 to DC2 and in other direction 
as well. (this is based on https://kafka.apache.org/081/ops.html)

2. Also many consumer app will prefer to get message in orderly fashion to make 
sense of the events. (Ordering provide by kafka is sufficient)

But producer app is active-active so our user events would be generate on 
either DC1 or DC2. How we manage reasonable order between events for user 
across DC?
To address this, the consumer group on DC1 will only consume ODD user id events 
and DC2 will only consume EVEN user id events.

So EVEN user id event generated on DC1, has to go through MM process to DC2. So 
this introduces the MM processing time and network latency, so ordering is not 
perfect but I think we can live with it.
I am new to Kafka and I appreciate any advice from more seasoned Kafka user. 
Will this work or fail? Is there better way? Any other suggestions?
Thanks,
Shri


This message and its contents (to include attachments) are the property of 
National Health Systems, Inc. and may contain confidential and proprietary 
information. This email and any files transmitted with it are intended solely 
for the use of the individual or entity to whom they are addressed. You are 
hereby notified that any unauthorized disclosure, copying, or distribution of 
this message, or the taking of any unauthorized action based on information 
contained herein is strictly prohibited. Unauthorized use of information 
contained herein may subject you to civil and criminal prosecution and 
penalties. If you are not the intended recipient, you should delete this 
message immediately and notify the sender immediately by telephone or by 
replying to this transmission.


Re: Broker Exceptions

2015-03-17 Thread Zakee
> What version are you running ?

Version 0.8.2.0

> Your case is 2). But the only thing weird is your replica (broker 3) is
> requesting for offset which is greater than the leaders log end offset.


So what could be the cause?

Thanks
Zakee



> On Mar 17, 2015, at 11:45 AM, Mayuresh Gharat  
> wrote:
> 
> What version are you running ?
> 
> The code for latest version says that :
> 
> 1) if the log end offset of the replica is greater than the leaders log end
> offset, the replicas offset will be reset to logEndOffset of the leader.
> 
> 2) Else if the log end offset of the replica is smaller than the leaders
> log end offset and its out of range, the replicas offset will be reset to
> logStartOffset of the leader.
> 
> Your case is 2). But the only thing weird is your replica (broker 3) is
> requesting for offset which is greater than the leaders log end offset.
> 
> Thanks,
> 
> Mayuresh
> 
> 
> On Tue, Mar 17, 2015 at 10:26 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com > wrote:
> 
>> cool.
>> 
>> On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:
>> 
>>> Hi Mayuresh,
>>> 
>>> The logs are already attached and are in reverse order starting backwards
>>> from [2015-03-14 07:46:52,517] to the time when brokers were started.
>>> 
>>> Thanks
>>> Zakee
>>> 
>>> 
>>> 
 On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
>>> gharatmayures...@gmail.com> wrote:
 
 Hi Zakee,
 
 Thanks for the logs. Can you paste earlier logs from broker-3 up to :
 
 [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
 offset 1754769769 for partition [Topic22kv,5] out of range; reset
 offset to 1400864851 (kafka.server.ReplicaFetcherThread)
 
 That would help us figure out what was happening on this broker before
>>> it
 issued a replicaFetch request to broker-4.
 
 Thanks,
 
 Mayuresh
 
 On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
 
> Hi Mayuresh,
> 
> Here are the logs.
> 
> 
> Old School Yearbook Pics
> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> 
>>> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
> 
> 
> Thanks,
> Kazim Zakee
> 
> 
> 
>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>> 
>> Can you provide more logs (complete) on Broker 3 till time :
>> 
>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
> for
>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
>>> current
>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
>> 
>> I would like to see logs from time much before it sent the fetch
>>> request
> to
>> Broker 4 to the time above. I want to check if in any case Broker 3
>>> was a
>> leader before broker 4 took over.
>> 
>> Additional logs will help.
>> 
>> 
>> Thanks,
>> 
>> Mayuresh
>> 
>> 
>> 
>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
>> 
>>> log.cleanup.policy is delete not compact.
>>> log.cleaner.enable=true
>>> log.cleaner.threads=5
>>> log.cleanup.policy=delete
>>> log.flush.scheduler.interval.ms=3000
>>> log.retention.minutes=1440
>>> log.segment.bytes=1073741824  (1gb)
>>> 
>>> Messages are keyed but not compressed, producer async and uses kafka
>>> default partitioner.
>>> String message = msg.getString();
>>> String uniqKey = ""+rnd.nextInt();// random key
>>> String partKey = getPartitionKey();// partition key
>>> KeyedMessage data = new KeyedMessage>> String>(this.topicName, uniqKey, partKey, message);
>>> producer.send(data);
>>> 
>>> Thanks
>>> Zakee
>>> 
>>> 
>>> 
 On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
 
 Is your topic log compacted? Also if it is are the messages keyed?
>>> Or
>>> are the messages compressed?
 
 Thanks,
 
 Mayuresh
 
 Sent from my iPhone
 
> On Mar 14, 2015, at 2:02 PM, Zakee >> kzak...@netzero.net>> wrote:
> 
> Thanks, Jiangjie for helping resolve the kafka controller migration
>>> driven partition leader rebalance issue. The logs are much cleaner
>>> now.
> 
> There are a few incidences of Out of range offset even though
>>> there
> is
>>> no consumers running, only producers and replica fetchers. I was
>>> trying
> to
>>> relate to a cause, looks like compaction (log segment deletion)
>>> causing
>>> this. Not sure whether this is expected behavior.
> 
> Broker-4:
> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]:
>>> Error
>>> when processing fetch request for partition [

Re: No topic owner when using different assignment strategies

2015-03-17 Thread Jiangjie Qin
Yeah, using different partition assignment algorithms in the same consumer
group won¹t work. Is there a particular reason you want to do this?

On 3/17/15, 8:32 AM, "tao xiao"  wrote:

>This is the corrected zk result
>
>Here is the result from zk
>[zk: localhost:2181(CONNECTED) 0] get
>/consumers/test/owners/mm-benchmark-test/0
>
>Node does not exist: /consumers/test/owners/mm-benchmark-test/0
>
>[zk: localhost:2181(CONNECTED) 1] get
>/consumers/test/owners/mm-benchmark-test1/0
>
>test-localhost-1426605370072-904d6fba-0
>
>On Tue, Mar 17, 2015 at 11:30 PM, tao xiao  wrote:
>
>> Hi team,
>>
>> I have two consumer instances with the same group id connecting to two
>> different topics with 1 partition created for each. One consumer uses
>> partition.assignment.strategy=roundrobin and the other one uses default
>> assignment strategy. Both consumers have 1 thread spawned internally and
>> connect kafka using createMessageStreamsByFilter.
>> The consumer with roundrobin assignment strategy connected kafka first
>>and
>> had 2 topics assigned to itself and then I brought up another consumer
>>that
>> has default assignment strategy configured. I saw rebalancing happened
>>in
>> both consumers but at the end only one of the topics was assigned to the
>> consumer that is configured roundrobin assignment strategy and no topics
>> were assigned to the other consumer. This led to one topic missing its
>> owner.
>>
>> Here is the result from zk
>> [zk: localhost:2181(CONNECTED) 0] get
>> /consumers/test/owners/mm-benchmark-test/0
>>
>> Node does not exist:
>> /consumers/test12345667f/owners/mm-benchmark-test/0
>>
>> [zk: localhost:2181(CONNECTED) 1] get
>> /consumers/test/owners/mm-benchmark-test1/0
>>
>> test-localhost-1426605370072-904d6fba-0
>>
>> The kafka version I use is 0.8.2.1
>>
>> --
>> Regards,
>> Tao
>>
>
>
>
>-- 
>Regards,
>Tao



Re: Monitoring of consumer group lag

2015-03-17 Thread Robin Yamaguchi
Hi Mathias,

We call bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker via NRPE,
and alert through Nagios.

-Robin

On Tue, Mar 17, 2015 at 2:46 AM, Kasper Mackenhauer Jacobsen <
kas...@falconsocial.com> wrote:

> Hi Mathias,
>
> We're currently using a custom solution that queries kafka and zookeeper (2
> different processes) for topic size and consumer offset and submits the
> information to a collectd/statsd instance that ships it on to graphite, so
> we can track it in grafana.
>
> There's no alerting built in, but it gives us a good overview of what's
> going on without having to consolidate another service like the offset
> monitor
>
>
> On Tue, Mar 17, 2015 at 10:36 AM, Mathias Söderberg <
> mathias.soederb...@gmail.com> wrote:
>
> > Hi Lance,
> >
> > I tried Kafka Offset Monitor a while back, but it didn't play especially
> > nice with a lot of topics / partitions (we currently have around 1400
> > topics and 4000 partitions in total). Might be possible to make it work a
> > bit better, but not sure it would be the best way to do alerting.
> >
> > Thanks for the tip though :).
> >
> > Best regards,
> > Mathias
> >
> >
> > On Mon, 16 Mar 2015 at 21:02 Lance Laursen 
> > wrote:
> >
> > > Hey Mathias,
> > >
> > > Kafka Offset Monitor will give you a general idea of where your
> consumer
> > > group(s) are at:
> > >
> > > http://quantifind.com/KafkaOffsetMonitor/
> > >
> > > However, I'm not sure how useful it will be with "a large number of
> > topics"
> > > / turning its output into a script that alerts upon a threshold. Could
> > take
> > > a look and see what they're doing though.
> > >
> > > On Mon, Mar 16, 2015 at 8:31 AM, Mathias Söderberg <
> > > mathias.soederb...@gmail.com> wrote:
> > >
> > > > Good day,
> > > >
> > > > I'm looking into using SimpleConsumer#getOffsetsBefore and offsets
> > > > committed in ZooKeeper for monitoring the lag of a consumer group.
> > > >
> > > > Our current use case is that we have a service that is continuously
> > > > consuming messages of a large number of topics and persisting the
> > > messages
> > > > to S3 at somewhat regular intervals (depends on time and the total
> size
> > > of
> > > > consumed messages for each partition). Offsets are committed to
> > ZooKeeper
> > > > after the messages have been persisted to S3.
> > > > The partitions are of varying load, so a simple threshold based on
> the
> > > > number of messages we're lagging behind would be cumbersome to
> maintain
> > > due
> > > > to the number of topics, and most likely prone to unnecessary alerts.
> > > >
> > > > Currently our broker configuration specifies log.roll.hours=1 and
> > > > log.segment.bytes=1GB, and my proposed solution is to have a separate
> > > > service that would iterate through all topics/partitions and use
> > > > #getOffsetsBefore with a timestamp that is one (1) or two (2) hours
> ago
> > > and
> > > > compare the first offset (which from my testing looks to be the
> offset
> > > that
> > > > is closest in time, i.e. from the log segment that is closest to the
> > > > timestamp given) with the one that is saved to ZooKeeper.
> > > > It feels like a pretty solid solution, given that we just want a
> rough
> > > > estimate of how much we're lagging behind in time, so that we know
> > > (again,
> > > > roughly) how much time we have to fix whatever is broken before the
> log
> > > > segments are deleted by Kafka.
> > > >
> > > > Is there anyone doing monitoring similar to this? Are there any
> obvious
> > > > downsides of this approach that I'm not thinking about? Thoughts on
> > > > alternatives?
> > > >
> > > > Best regards,
> > > > Mathias
> > > >
> > >
> >
>
>
>
> --
> *Kasper Mackenhauer Jacobsen*
>


Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread James Cheng
This is a great set of projects!

We should put this list of projects on a site somewhere so people can more 
easily see and refer to it. These aren't Kafka-specific, but most seem to be 
"MySQL CDC." Does anyone have a place where they can host a page? Preferably a 
wiki, so we can keep it up to date easily.

-James

On Mar 17, 2015, at 8:21 AM, Hisham Mardam-Bey  
wrote:

> Pretty much a hijack / plug as well (=
> 
> https://github.com/mardambey/mypipe
> 
> "MySQL binary log consumer with the ability to act on changed rows and
> publish changes to different systems with emphasis on Apache Kafka."
> 
> Mypipe currently encodes events using Avro before pushing them into Kafka
> and is Avro schema repository aware. The project is young; and patches for
> improvements are appreciated (=
> 
> On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan  wrote:
> 
>> Great work.
>> Sorry for kinda hijacking this thread, but I though that we had built
>> some-thing on mysql bin log event propagator and wanted to share it .
>> You guys can also look into Aesop ( https://github.com/Flipkart/aesop).
>> Its
>> a change propagation frame-work. It has relays which listens to bin logs of
>> Mysql, keeps track of SCNs  and has consumers which can then (transform/map
>> or interpret as is) the bin log-event to a destination. Consumers also keep
>> track of SCNs and a slow consumer can go back to a previous SCN if it wants
>> to re-listen to events  ( similar to kafka's consumer view ).
>> 
>> All the producers/consumers are extensible and you can write your own
>> custom consumer and feed off the data to it.
>> 
>> Common use-cases:
>> a) Archive mysql based data into say hbase
>> b) Move mysql based data to say a search store for serving reads.
>> 
>> It has a decent ( not an awesome :) ) console too which gives a nice human
>> readable view of where the producers and consumers are.
>> 
>> Current supported producers are mysql bin logs, hbase wall-edits.
>> 
>> 
>> Further insights/reviews/feature reqs/pull reqs/advices are all welcome.
>> 
>> --
>> Arya
>> 
>> Arya
>> 
>> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
>> wrote:
>> 
>>> Really really nice!
>>> 
>>> Thank you.
>>> 
>>> On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard >> 
>>> wrote:
 Hi kafka,
 
 I just wanted to mention I published a very simple project which can
 connect as MySQL replication client and stream replication events to
 kafka: https://github.com/pyr/sqlstream
 
 When you don't have control over an application, it can provide a
>> simple
 way of consolidating SQL data in kafka.
 
 This is an early release and there are a few caveats (mentionned in the
 README), mostly the poor partitioning which I'm going to evolve quickly
 and the reconnection strategy which doesn't try to keep track of binlog
 position, other than that, it should work as advertised.
 
 Cheers,
  - pyr
>>> 
>> 
> 
> 
> 
> -- 
> Hisham Mardam-Bey
> http://hisham.cc/



Re: schema.registry.url = null

2015-03-17 Thread Ewen Cheslack-Postava
Clint,

Your code looks fine and the output doesn't actually have any errors, but
you're also not waiting for the messages to be published. Try changing

producer.send(data);

to

producer.send(data).get();

to wait block until the message has been acked. If it runs and exits
cleanly, then you should be able to see it using a consumer, e.g
kafka-avro-console-consumer.

The warning that you're seeing is due to the KafkaProducer's configuration
class not using the schema.registry.url setting; the same settings are
passed on to the serializers which do use it. It incorrectly reports the
value as null due to a bug, I filed
https://issues.apache.org/jira/browse/KAFKA-2026 to address that.

By the way, for Confluent stuff that's not part of the Apache Kafka
repository, you might want to ask questions on this list instead:
https://groups.google.com/forum/#!forum/confluent-platform


On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil  wrote:

> Hi
>
> I can't get the Kafka/Avro serializer producer example to work.
>
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import java.util.Properties;
>
> /**
>  * Created by clint on 3/17/15.
>  */
> public class Confluent {
>
> public static void  main (String[] args){
>
> KafkaProducer producer;
> Properties propsKafka = new Properties();
>
> propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
> propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> io.confluent.kafka.serializers.KafkaAvroSerializer.class);
> propsKafka.put("schema.registry.url", "http://localhost:8081";);
> producer = new KafkaProducer(propsKafka);
>
> String key = "key1";
> String userSchema = "{\"type\":\"record\"," +
> "\"name\":\"myrecord\"," +
> "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
>
> Schema.Parser parser = new Schema.Parser();
> Schema schema = parser.parse(userSchema);
>
> GenericRecord avroRecord = new GenericData.Record(schema);
> avroRecord.put("f1", "value4");
>
> ProducerRecord data = new ProducerRecord Object>("test", key , avroRecord);
> producer.send(data);
>
> }
> }
>
>
> The output is:
>
> Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig
> logAll
> INFO: ProducerConfig values:
> compression.type = none
> metric.reporters = []
> metadata.max.age.ms = 30
> metadata.fetch.timeout.ms = 6
> acks = 1
> batch.size = 16384
> reconnect.backoff.ms = 10
> bootstrap.servers = [localhost:9092]
> receive.buffer.bytes = 32768
> retry.backoff.ms = 100
> buffer.memory = 33554432
> timeout.ms = 3
> key.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
> retries = 0
> max.request.size = 1048576
> block.on.buffer.full = true
> value.serializer = class
> io.confluent.kafka.serializers.KafkaAvroSerializer
> metrics.sample.window.ms = 3
> send.buffer.bytes = 131072
> max.in.flight.requests.per.connection = 5
> metrics.num.samples = 2
> linger.ms = 0
> client.id =
>
> Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
> logUnused
> WARNING: The configuration schema.registry.url = null was supplied but
> isn't a known config.
>
> Please help
>
> Thanks
>



-- 
Thanks,
Ewen


Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
What version are you running ?

The code for latest version says that :

1) if the log end offset of the replica is greater than the leaders log end
offset, the replicas offset will be reset to logEndOffset of the leader.

2) Else if the log end offset of the replica is smaller than the leaders
log end offset and its out of range, the replicas offset will be reset to
logStartOffset of the leader.

Your case is 2). But the only thing weird is your replica (broker 3) is
requesting for offset which is greater than the leaders log end offset.

Thanks,

Mayuresh


On Tue, Mar 17, 2015 at 10:26 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> cool.
>
> On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:
>
>> Hi Mayuresh,
>>
>> The logs are already attached and are in reverse order starting backwards
>> from [2015-03-14 07:46:52,517] to the time when brokers were started.
>>
>> Thanks
>> Zakee
>>
>>
>>
>> > On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>> >
>> > Hi Zakee,
>> >
>> > Thanks for the logs. Can you paste earlier logs from broker-3 up to :
>> >
>> > [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
>> > offset 1754769769 for partition [Topic22kv,5] out of range; reset
>> > offset to 1400864851 (kafka.server.ReplicaFetcherThread)
>> >
>> > That would help us figure out what was happening on this broker before
>> it
>> > issued a replicaFetch request to broker-4.
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
>> >
>> >> Hi Mayuresh,
>> >>
>> >> Here are the logs.
>> >>
>> >> 
>> >> Old School Yearbook Pics
>> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
>> >>
>> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
>> >>
>> >>
>> >> Thanks,
>> >> Kazim Zakee
>> >>
>> >>
>> >>
>> >>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
>> >> gharatmayures...@gmail.com> wrote:
>> >>>
>> >>> Can you provide more logs (complete) on Broker 3 till time :
>> >>>
>> >>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
>> >> for
>> >>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
>> current
>> >>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
>> >>>
>> >>> I would like to see logs from time much before it sent the fetch
>> request
>> >> to
>> >>> Broker 4 to the time above. I want to check if in any case Broker 3
>> was a
>> >>> leader before broker 4 took over.
>> >>>
>> >>> Additional logs will help.
>> >>>
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Mayuresh
>> >>>
>> >>>
>> >>>
>> >>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
>> >>>
>>  log.cleanup.policy is delete not compact.
>>  log.cleaner.enable=true
>>  log.cleaner.threads=5
>>  log.cleanup.policy=delete
>>  log.flush.scheduler.interval.ms=3000
>>  log.retention.minutes=1440
>>  log.segment.bytes=1073741824  (1gb)
>> 
>>  Messages are keyed but not compressed, producer async and uses kafka
>>  default partitioner.
>>  String message = msg.getString();
>>  String uniqKey = ""+rnd.nextInt();// random key
>>  String partKey = getPartitionKey();// partition key
>>  KeyedMessage data = new KeyedMessage>  String>(this.topicName, uniqKey, partKey, message);
>>  producer.send(data);
>> 
>>  Thanks
>>  Zakee
>> 
>> 
>> 
>> > On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
>> >
>> > Is your topic log compacted? Also if it is are the messages keyed?
>> Or
>>  are the messages compressed?
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > Sent from my iPhone
>> >
>> >> On Mar 14, 2015, at 2:02 PM, Zakee >  kzak...@netzero.net>> wrote:
>> >>
>> >> Thanks, Jiangjie for helping resolve the kafka controller migration
>>  driven partition leader rebalance issue. The logs are much cleaner
>> now.
>> >>
>> >> There are a few incidences of Out of range offset even though
>> there
>> >> is
>>  no consumers running, only producers and replica fetchers. I was
>> trying
>> >> to
>>  relate to a cause, looks like compaction (log segment deletion)
>> causing
>>  this. Not sure whether this is expected behavior.
>> >>
>> >> Broker-4:
>> >> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]:
>> Error
>>  when processing fetch request for partition [Topic22kv,5] offset
>> >> 1754769769
>>  from follower with correlation id 1645671. Possible cause: Request
>> for
>>  offset 1754769769 but we only have log segments in the range
>> 1400864851
>> >> to
>>  1754769732. (kafka.server.ReplicaManager)
>> >>
>> >> Broker-3:
>> >> [2015-03-14 07:46:52,356] INFO The cleaning for partition
>> >> [Topic22kv,5]
>>  is aborted and paused (kafka.log.LogCleaner)
>> >> [2015-03-14 07:46:5

Re: Support for Java 1.8?

2015-03-17 Thread Roger Hoover
Thanks, Jon.  That helps.

On Tue, Mar 17, 2015 at 11:34 AM, Jon Bringhurst <
jbringhu...@linkedin.com.invalid> wrote:

> At LinkedIn, we're running on 1.8.0u5. YRMV depending on hardware and
> load, but this is what we typically run with:
>
> -server
> -Xms4g
> -Xmx4g
> -XX:PermSize=96m
> -XX:MaxPermSize=96m
> -XX:+UseG1GC
> -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35
> -verbose:gc
> -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps
> -XX:+PrintGCDateStamps
> -XX:+PrintTenuringDistribution
> -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps
> -XX:+PrintTenuringDistribution
> -Xloggc:logs/gc.log
> -XX:ErrorFile=logs/hs_err.log
>
> -Jon
>
> On Mar 17, 2015, at 10:26 AM, Roger Hoover  wrote:
>
> > Resurrecting an old thread.  Are people running Kafka on Java 8 now?
> >
> > On Sun, Aug 10, 2014 at 11:44 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com> wrote:
> >
> >> Just curious if you saw any issues with Java 1.8 or if everything went
> >> smoothly?
> >>
> >> Otis
> >> --
> >> Performance Monitoring * Log Analytics * Search Analytics
> >> Solr & Elasticsearch Support * http://sematext.com/
> >>
> >>
> >>
> >> On Tue, Jun 17, 2014 at 3:28 PM, Todd Palino
>  >>>
> >> wrote:
> >>
> >>> We¹re about to start testing in the next month. As soon as I have 1.8
> u5
> >>> on the systems to work with. I don¹t anticipate too much of an issue,
> >>> since 7 -> 8 is supposed to be less trouble than 6 -> 7, and we didn¹t
> >>> have any problems moving to 7.
> >>>
> >>> -Todd
> >>>
> >>>
> >>> On 6/17/14, 6:22 AM, "Chris Neal"  wrote:
> >>>
>  Hi,
> 
>  I was wondering if Kafka has been tested/certified under JRE 1.8?
> 
>  Thanks!
>  Chris
> >>>
> >>>
> >>
>
>


Re: Support for Java 1.8?

2015-03-17 Thread Jon Bringhurst
At LinkedIn, we're running on 1.8.0u5. YRMV depending on hardware and load, but 
this is what we typically run with:

-server
-Xms4g
-Xmx4g
-XX:PermSize=96m
-XX:MaxPermSize=96m
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-verbose:gc
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+PrintGCDateStamps
-XX:+PrintTenuringDistribution
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-XX:+PrintTenuringDistribution
-Xloggc:logs/gc.log
-XX:ErrorFile=logs/hs_err.log

-Jon

On Mar 17, 2015, at 10:26 AM, Roger Hoover  wrote:

> Resurrecting an old thread.  Are people running Kafka on Java 8 now?
> 
> On Sun, Aug 10, 2014 at 11:44 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
> 
>> Just curious if you saw any issues with Java 1.8 or if everything went
>> smoothly?
>> 
>> Otis
>> --
>> Performance Monitoring * Log Analytics * Search Analytics
>> Solr & Elasticsearch Support * http://sematext.com/
>> 
>> 
>> 
>> On Tue, Jun 17, 2014 at 3:28 PM, Todd Palino >> 
>> wrote:
>> 
>>> We¹re about to start testing in the next month. As soon as I have 1.8 u5
>>> on the systems to work with. I don¹t anticipate too much of an issue,
>>> since 7 -> 8 is supposed to be less trouble than 6 -> 7, and we didn¹t
>>> have any problems moving to 7.
>>> 
>>> -Todd
>>> 
>>> 
>>> On 6/17/14, 6:22 AM, "Chris Neal"  wrote:
>>> 
 Hi,
 
 I was wondering if Kafka has been tested/certified under JRE 1.8?
 
 Thanks!
 Chris
>>> 
>>> 
>> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
cool.

On Tue, Mar 17, 2015 at 10:15 AM, Zakee  wrote:

> Hi Mayuresh,
>
> The logs are already attached and are in reverse order starting backwards
> from [2015-03-14 07:46:52,517] to the time when brokers were started.
>
> Thanks
> Zakee
>
>
>
> > On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > Hi Zakee,
> >
> > Thanks for the logs. Can you paste earlier logs from broker-3 up to :
> >
> > [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> > offset 1754769769 for partition [Topic22kv,5] out of range; reset
> > offset to 1400864851 (kafka.server.ReplicaFetcherThread)
> >
> > That would help us figure out what was happening on this broker before it
> > issued a replicaFetch request to broker-4.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
> >
> >> Hi Mayuresh,
> >>
> >> Here are the logs.
> >>
> >> 
> >> Old School Yearbook Pics
> >> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> >>
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
> >>
> >>
> >> Thanks,
> >> Kazim Zakee
> >>
> >>
> >>
> >>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>>
> >>> Can you provide more logs (complete) on Broker 3 till time :
> >>>
> >>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
> >> for
> >>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> current
> >>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
> >>>
> >>> I would like to see logs from time much before it sent the fetch
> request
> >> to
> >>> Broker 4 to the time above. I want to check if in any case Broker 3
> was a
> >>> leader before broker 4 took over.
> >>>
> >>> Additional logs will help.
> >>>
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>>
> >>>
> >>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
> >>>
>  log.cleanup.policy is delete not compact.
>  log.cleaner.enable=true
>  log.cleaner.threads=5
>  log.cleanup.policy=delete
>  log.flush.scheduler.interval.ms=3000
>  log.retention.minutes=1440
>  log.segment.bytes=1073741824  (1gb)
> 
>  Messages are keyed but not compressed, producer async and uses kafka
>  default partitioner.
>  String message = msg.getString();
>  String uniqKey = ""+rnd.nextInt();// random key
>  String partKey = getPartitionKey();// partition key
>  KeyedMessage data = new KeyedMessage  String>(this.topicName, uniqKey, partKey, message);
>  producer.send(data);
> 
>  Thanks
>  Zakee
> 
> 
> 
> > On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> >
> > Is your topic log compacted? Also if it is are the messages keyed? Or
>  are the messages compressed?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > Sent from my iPhone
> >
> >> On Mar 14, 2015, at 2:02 PM, Zakee   kzak...@netzero.net>> wrote:
> >>
> >> Thanks, Jiangjie for helping resolve the kafka controller migration
>  driven partition leader rebalance issue. The logs are much cleaner
> now.
> >>
> >> There are a few incidences of Out of range offset even though  there
> >> is
>  no consumers running, only producers and replica fetchers. I was
> trying
> >> to
>  relate to a cause, looks like compaction (log segment deletion)
> causing
>  this. Not sure whether this is expected behavior.
> >>
> >> Broker-4:
> >> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]: Error
>  when processing fetch request for partition [Topic22kv,5] offset
> >> 1754769769
>  from follower with correlation id 1645671. Possible cause: Request for
>  offset 1754769769 but we only have log segments in the range
> 1400864851
> >> to
>  1754769732. (kafka.server.ReplicaManager)
> >>
> >> Broker-3:
> >> [2015-03-14 07:46:52,356] INFO The cleaning for partition
> >> [Topic22kv,5]
>  is aborted and paused (kafka.log.LogCleaner)
> >> [2015-03-14 07:46:52,408] INFO Scheduling log segment 1400864851 for
>  log Topic22kv-5 for deletion. (kafka.log.Log)
> >> …
> >> [2015-03-14 07:46:52,421] INFO Compaction for partition
> [Topic22kv,5]
>  is resumed (kafka.log.LogCleaner)
> >> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
>  offset 1754769769 for partition [Topic22kv,5] out of range; reset
> >> offset to
>  1400864851 (kafka.server.ReplicaFetcherThread)
> >> [2015-03-14 07:46:52,517] WARN [ReplicaFetcherThread-2-4], Replica 3
>  for partition [Topic22kv,5] reset its fetch offset from 1400864851 to
>  current leader 4's start offset 1400864851
>  (kafka.server.ReplicaFetcherThread)
> >>
> >> 
> >> Old School Yearbook Pi

Re: Support for Java 1.8?

2015-03-17 Thread Roger Hoover
Resurrecting an old thread.  Are people running Kafka on Java 8 now?

On Sun, Aug 10, 2014 at 11:44 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Just curious if you saw any issues with Java 1.8 or if everything went
> smoothly?
>
> Otis
> --
> Performance Monitoring * Log Analytics * Search Analytics
> Solr & Elasticsearch Support * http://sematext.com/
>
>
>
> On Tue, Jun 17, 2014 at 3:28 PM, Todd Palino  >
> wrote:
>
> > We¹re about to start testing in the next month. As soon as I have 1.8 u5
> > on the systems to work with. I don¹t anticipate too much of an issue,
> > since 7 -> 8 is supposed to be less trouble than 6 -> 7, and we didn¹t
> > have any problems moving to 7.
> >
> > -Todd
> >
> >
> > On 6/17/14, 6:22 AM, "Chris Neal"  wrote:
> >
> > >Hi,
> > >
> > >I was wondering if Kafka has been tested/certified under JRE 1.8?
> > >
> > >Thanks!
> > >Chris
> >
> >
>


Re: Broker Exceptions

2015-03-17 Thread Zakee
Hi Mayuresh,

The logs are already attached and are in reverse order starting backwards from 
[2015-03-14 07:46:52,517] to the time when brokers were started.

Thanks
Zakee



> On Mar 17, 2015, at 12:07 AM, Mayuresh Gharat  
> wrote:
> 
> Hi Zakee,
> 
> Thanks for the logs. Can you paste earlier logs from broker-3 up to :
> 
> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> offset 1754769769 for partition [Topic22kv,5] out of range; reset
> offset to 1400864851 (kafka.server.ReplicaFetcherThread)
> 
> That would help us figure out what was happening on this broker before it
> issued a replicaFetch request to broker-4.
> 
> Thanks,
> 
> Mayuresh
> 
> On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:
> 
>> Hi Mayuresh,
>> 
>> Here are the logs.
>> 
>> 
>> Old School Yearbook Pics
>> View Class Yearbooks Online Free. Search by School & Year. Look Now!
>> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
>> 
>> 
>> Thanks,
>> Kazim Zakee
>> 
>> 
>> 
>>> On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>>> 
>>> Can you provide more logs (complete) on Broker 3 till time :
>>> 
>>> *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
>> for
>>> partition [Topic22kv,5] reset its fetch offset from 1400864851 to current
>>> leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
>>> 
>>> I would like to see logs from time much before it sent the fetch request
>> to
>>> Broker 4 to the time above. I want to check if in any case Broker 3 was a
>>> leader before broker 4 took over.
>>> 
>>> Additional logs will help.
>>> 
>>> 
>>> Thanks,
>>> 
>>> Mayuresh
>>> 
>>> 
>>> 
>>> On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
>>> 
 log.cleanup.policy is delete not compact.
 log.cleaner.enable=true
 log.cleaner.threads=5
 log.cleanup.policy=delete
 log.flush.scheduler.interval.ms=3000
 log.retention.minutes=1440
 log.segment.bytes=1073741824  (1gb)
 
 Messages are keyed but not compressed, producer async and uses kafka
 default partitioner.
 String message = msg.getString();
 String uniqKey = ""+rnd.nextInt();// random key
 String partKey = getPartitionKey();// partition key
 KeyedMessage data = new KeyedMessage>>> String>(this.topicName, uniqKey, partKey, message);
 producer.send(data);
 
 Thanks
 Zakee
 
 
 
> On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> 
> Is your topic log compacted? Also if it is are the messages keyed? Or
 are the messages compressed?
> 
> Thanks,
> 
> Mayuresh
> 
> Sent from my iPhone
> 
>> On Mar 14, 2015, at 2:02 PM, Zakee >>> kzak...@netzero.net>> wrote:
>> 
>> Thanks, Jiangjie for helping resolve the kafka controller migration
 driven partition leader rebalance issue. The logs are much cleaner now.
>> 
>> There are a few incidences of Out of range offset even though  there
>> is
 no consumers running, only producers and replica fetchers. I was trying
>> to
 relate to a cause, looks like compaction (log segment deletion) causing
 this. Not sure whether this is expected behavior.
>> 
>> Broker-4:
>> [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]: Error
 when processing fetch request for partition [Topic22kv,5] offset
>> 1754769769
 from follower with correlation id 1645671. Possible cause: Request for
 offset 1754769769 but we only have log segments in the range 1400864851
>> to
 1754769732. (kafka.server.ReplicaManager)
>> 
>> Broker-3:
>> [2015-03-14 07:46:52,356] INFO The cleaning for partition
>> [Topic22kv,5]
 is aborted and paused (kafka.log.LogCleaner)
>> [2015-03-14 07:46:52,408] INFO Scheduling log segment 1400864851 for
 log Topic22kv-5 for deletion. (kafka.log.Log)
>> …
>> [2015-03-14 07:46:52,421] INFO Compaction for partition [Topic22kv,5]
 is resumed (kafka.log.LogCleaner)
>> [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
 offset 1754769769 for partition [Topic22kv,5] out of range; reset
>> offset to
 1400864851 (kafka.server.ReplicaFetcherThread)
>> [2015-03-14 07:46:52,517] WARN [ReplicaFetcherThread-2-4], Replica 3
 for partition [Topic22kv,5] reset its fetch offset from 1400864851 to
 current leader 4's start offset 1400864851
 (kafka.server.ReplicaFetcherThread)
>> 
>> 
>> Old School Yearbook Pics
>> View Class Yearbooks Online Free. Search by School & Year. Look Now!
>> 
 
>> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc <
 
>> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc>
>> 
>> 
>> 
>> Thanks
>> Zakee
>> 
>>> On Mar 9, 2015, at 12:18

Re: consumer groups in python

2015-03-17 Thread Kasper Mackenhauer Jacobsen
We set the partitions the python consumers needs manually for now, I'm
looking into a solution using zookeeper (possibly) to balance them out
automatically though.

On Tue, Mar 17, 2015 at 2:51 PM, Todd Palino  wrote:

> Yeah, this is exactly correct. The python client does not implement the
> Zookeeper logic that would be needed to do a balanced consumer. While it's
> certainly possible to do it (for example, Joe implemented it in Go), the
> logic is non-trivial and nobody has bothered to this point. I don't think
> anyone will, as the new consumer will make it much easier to implement
> clients without needing to do it.
>
> In the past, we've used an internal python module that calls a C library
> underneath that does the balancing. Now we're moving to one that calls our
> REST interface to Kafka, which is easier to work with. Another option that
> some consumers use is to pipe messages in from the kafka-console-consumer.
> This works well, but if you're not careful with stopping it you can easily
> lose messages.
>
> -Todd
>
>
> On Tue, Mar 17, 2015 at 6:47 AM, Sloot, Hans-Peter <
> hans-peter.sl...@atos.net> wrote:
>
> > Thanks
> >
> > I just came across this
> https://github.com/mumrah/kafka-python/issues/112
> > It says:
> > That contract of one message per consumer group only works for
> the
> > coordinated consumers which are implemented for the JVM only (i.e., Scala
> > and Java clients).
> >
> >
> > -Original Message-
> > From: Steve Miller [mailto:st...@idrathernotsay.com]
> > Sent: Tuesday, March 17, 2015 2:18 PM
> > To: users@kafka.apache.org
> > Subject: Re: consumer groups in python
> >
> > It's possible that I just haven't used it but I am reasonably sure that
> > the python API doesn't have a way to store offsets in ZK.  You would need
> > to implement something more or less compatible with what the Scala/Java
> API
> > does, presumably.
> >
> > On the plus side the python API -- possibly just because in python,
> > nothing is truly private (: -- exposes offsets and offset management in
> > ways that those other APIs seem not to.   Seeking, say, to approximately
> > 1000 messages before the current offset is no big deal in python, nor is
> > fetching oldest and newest offsets for topics (e.g., if you want to alert
> > if nothing is being produced, without having to fire up a consumer).  I
> > have close to zero experience with anything other than the python API and
> > librdkafka but judging from questions I see here those seem to be
> difficult
> > to do in Scala or Java.  I hope to do more with those APIs soon (and in
> > fact am at ScalaDays right now in part so I can attend some intro Scala
> > training (-: ).
> >
> > -Steve
> >
> >
> >
> > > On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <
> > hans-peter.sl...@atos.net> wrote:
> > >
> > > Hi,
> > >
> > > I wrote a small python script to consume messages from kafka.
> > >
> > > The consumer is defined as follows:
> > > kafka = KafkaConsumer('my-replicated-topic',
> > >   metadata_broker_list=['localhost:9092'],
> > >   group_id='my_consumer_group',
> > >   auto_commit_enable=True,
> > >   auto_commit_interval_ms=30 * 1000,
> > >   auto_offset_reset='smallest')
> > >
> > > But when I start 2 consumers simultaneously both receive all messages
> > from the topic.
> > > I would expect to have 1 consumer about half the number of messages and
> > the other the rest.
> > >
> > > How can I arrange this?
> > >
> > > Regards Hans-Peter
> > >
> > > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive this
> > e-mail in error, please notify the sender immediately and destroy it. As
> > its integrity cannot be secured on the Internet, Atos’ liability cannot
> be
> > triggered for the message content. Although the sender endeavours to
> > maintain a computer virus-free network, the sender does not warrant that
> > this transmission is virus-free and will not be liable for any damages
> > resulting from any virus transmitted. On all offers and agreements under
> > which Atos Nederland B.V. supplies goods and/or services of whatever
> > nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> > The Terms of Delivery shall be promptly submitted to you on your request.
> > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive this
> > e-mail in error, please notify the sender immediately and destroy it. As
> > its integrity cannot be secured on the Internet, Atos’ liability cannot
> be
> > triggered for the message content. Although the sender endeavours to
> > maintain a computer virus-free network, the sender does not warrant that
> > this transmission is virus-free and will not be liable for any damages
> > resulting from any virus transmitte

Re: No topic owner when using different assignment strategies

2015-03-17 Thread tao xiao
This is the corrected zk result

Here is the result from zk
[zk: localhost:2181(CONNECTED) 0] get
/consumers/test/owners/mm-benchmark-test/0

Node does not exist: /consumers/test/owners/mm-benchmark-test/0

[zk: localhost:2181(CONNECTED) 1] get
/consumers/test/owners/mm-benchmark-test1/0

test-localhost-1426605370072-904d6fba-0

On Tue, Mar 17, 2015 at 11:30 PM, tao xiao  wrote:

> Hi team,
>
> I have two consumer instances with the same group id connecting to two
> different topics with 1 partition created for each. One consumer uses
> partition.assignment.strategy=roundrobin and the other one uses default
> assignment strategy. Both consumers have 1 thread spawned internally and
> connect kafka using createMessageStreamsByFilter.
> The consumer with roundrobin assignment strategy connected kafka first and
> had 2 topics assigned to itself and then I brought up another consumer that
> has default assignment strategy configured. I saw rebalancing happened in
> both consumers but at the end only one of the topics was assigned to the
> consumer that is configured roundrobin assignment strategy and no topics
> were assigned to the other consumer. This led to one topic missing its
> owner.
>
> Here is the result from zk
> [zk: localhost:2181(CONNECTED) 0] get
> /consumers/test/owners/mm-benchmark-test/0
>
> Node does not exist:
> /consumers/test12345667f/owners/mm-benchmark-test/0
>
> [zk: localhost:2181(CONNECTED) 1] get
> /consumers/test/owners/mm-benchmark-test1/0
>
> test-localhost-1426605370072-904d6fba-0
>
> The kafka version I use is 0.8.2.1
>
> --
> Regards,
> Tao
>



-- 
Regards,
Tao


No topic owner when using different assignment strategies

2015-03-17 Thread tao xiao
Hi team,

I have two consumer instances with the same group id connecting to two
different topics with 1 partition created for each. One consumer uses
partition.assignment.strategy=roundrobin and the other one uses default
assignment strategy. Both consumers have 1 thread spawned internally and
connect kafka using createMessageStreamsByFilter.
The consumer with roundrobin assignment strategy connected kafka first and
had 2 topics assigned to itself and then I brought up another consumer that
has default assignment strategy configured. I saw rebalancing happened in
both consumers but at the end only one of the topics was assigned to the
consumer that is configured roundrobin assignment strategy and no topics
were assigned to the other consumer. This led to one topic missing its
owner.

Here is the result from zk
[zk: localhost:2181(CONNECTED) 0] get
/consumers/test/owners/mm-benchmark-test/0

Node does not exist: /consumers/test12345667f/owners/mm-benchmark-test/0

[zk: localhost:2181(CONNECTED) 1] get
/consumers/test/owners/mm-benchmark-test1/0

test-localhost-1426605370072-904d6fba-0

The kafka version I use is 0.8.2.1

-- 
Regards,
Tao


Re: [ANN] sqlstream: Simple MySQL binlog to Kafka stream

2015-03-17 Thread Hisham Mardam-Bey
Pretty much a hijack / plug as well (=

https://github.com/mardambey/mypipe

"MySQL binary log consumer with the ability to act on changed rows and
publish changes to different systems with emphasis on Apache Kafka."

Mypipe currently encodes events using Avro before pushing them into Kafka
and is Avro schema repository aware. The project is young; and patches for
improvements are appreciated (=

On Mon, Mar 16, 2015 at 10:35 PM, Arya Ketan  wrote:

> Great work.
> Sorry for kinda hijacking this thread, but I though that we had built
> some-thing on mysql bin log event propagator and wanted to share it .
> You guys can also look into Aesop ( https://github.com/Flipkart/aesop).
> Its
> a change propagation frame-work. It has relays which listens to bin logs of
> Mysql, keeps track of SCNs  and has consumers which can then (transform/map
> or interpret as is) the bin log-event to a destination. Consumers also keep
> track of SCNs and a slow consumer can go back to a previous SCN if it wants
> to re-listen to events  ( similar to kafka's consumer view ).
>
> All the producers/consumers are extensible and you can write your own
> custom consumer and feed off the data to it.
>
> Common use-cases:
> a) Archive mysql based data into say hbase
> b) Move mysql based data to say a search store for serving reads.
>
> It has a decent ( not an awesome :) ) console too which gives a nice human
> readable view of where the producers and consumers are.
>
> Current supported producers are mysql bin logs, hbase wall-edits.
>
>
> Further insights/reviews/feature reqs/pull reqs/advices are all welcome.
>
> --
> Arya
>
> Arya
>
> On Tue, Mar 17, 2015 at 1:48 AM, Gwen Shapira 
> wrote:
>
> > Really really nice!
> >
> > Thank you.
> >
> > On Mon, Mar 16, 2015 at 7:18 AM, Pierre-Yves Ritschard  >
> > wrote:
> > > Hi kafka,
> > >
> > > I just wanted to mention I published a very simple project which can
> > > connect as MySQL replication client and stream replication events to
> > > kafka: https://github.com/pyr/sqlstream
> > >
> > > When you don't have control over an application, it can provide a
> simple
> > > way of consolidating SQL data in kafka.
> > >
> > > This is an early release and there are a few caveats (mentionned in the
> > > README), mostly the poor partitioning which I'm going to evolve quickly
> > > and the reconnection strategy which doesn't try to keep track of binlog
> > > position, other than that, it should work as advertised.
> > >
> > > Cheers,
> > >   - pyr
> >
>



-- 
Hisham Mardam-Bey
http://hisham.cc/


Re: Kafka High Level Consumer OOME

2015-03-17 Thread Guozhang Wang
Hello Dima,

The current consumer does not have explicit memory control mechanism, but
you can try to indirectly bound the memory usage via the following configs:
fetch.message.max.bytes and queued.max.message.chunks. Details can be found
at http://kafka.apache.org/documentation.html#consumerconfigs

As for your problem, I'm wondering are your messages being compressed.
There are known issues with the current decompression logic on the consumer
that it may allocate large chunks of memory unnecessarily and cause OOME
(KAFKA-527).

Guozhang

On Tue, Mar 17, 2015 at 3:10 AM, Dima Dimas  wrote:

> Hi
>
> I face to OOME while trying to consume from one topic 10 partitions (100
> 000 messages each partition) 5 consumers(consumer groups),
> consumer.timeout=10ms. OOME was gotten after 1-2 minutes after start.
> Java heap - Xms=1024M
> LAN about 10Gbit
> This is standalone application.
>
> Kafka version 0.8.2
>
> Messages have about 5-10kB size each.
> Before OOME consumers received from 5 000 to 30 000 messages per request
> from one topic.
> Each consumer reads from different topics.
>
> Part of the code(manually handle offsets) :
>
>  Map streamCounts = Collections.singletonMap(topic, 1);
> ConsumerConnector connector =
> consumer.createJavaConsumerConnector(consumerConf);
>
> Map>> streams =
> connector.createMessageStreams(streamCounts);
> //read in one stream
> KafkaStream stream = streams.get(topic).get(0);
> //read from stream in infinity loop
> while(true){
>  try {
> for (MessageAndMetadata messageAndMetadata :
> stream) {
> Message msg = new Message(messageAndMetadata);
> messages.add(msg);
> }
> } catch (ConsumerTimeoutException ignore) {
> // throws every time when Kafka consumer timeout was reached
> }
> //some logic
>
> //commit offsets
>
> //thread sleep 1 sec
> }
>
> //close connection
>
>
>
> Stacktrace log
>
> [[03/06/2015 13:23:56
>
> [ConsumerFetcherThread-preprocessorTopicReporting_mo-host-1425648218660-2d7f632b-0-5]
> ERROR kafka.network.BoundedByteBufferReceive - OOME with size 1048612
>2097206
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> *Q1: How to solve this issue?*
> *Q2: What can be the root cause?*
> *Q3: How can i control memory allocating for each consumer?*
>
> *Thanks*
> *Dima.*
>



-- 
-- Guozhang


schema.registry.url = null

2015-03-17 Thread Clint Mcneil
Hi

I can't get the Kafka/Avro serializer producer example to work.

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by clint on 3/17/15.
 */
public class Confluent {

public static void  main (String[] args){

KafkaProducer producer;
Properties propsKafka = new Properties();

propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
propsKafka.put("schema.registry.url", "http://localhost:8081";);
producer = new KafkaProducer(propsKafka);

String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);

GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value4");

ProducerRecord data = new ProducerRecord("test", key , avroRecord);
producer.send(data);

}
}


The output is:

Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig logAll
INFO: ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 30
metadata.fetch.timeout.ms = 6
acks = 1
batch.size = 16384
reconnect.backoff.ms = 10
bootstrap.servers = [localhost:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 33554432
timeout.ms = 3
key.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
retries = 0
max.request.size = 1048576
block.on.buffer.full = true
value.serializer = class
io.confluent.kafka.serializers.KafkaAvroSerializer
metrics.sample.window.ms = 3
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 0
client.id =

Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
logUnused
WARNING: The configuration schema.registry.url = null was supplied but
isn't a known config.

Please help

Thanks


Kafka High Level Consumer OOME

2015-03-17 Thread Dima Dimas
Hi

I face to OOME while trying to consume from one topic 10 partitions (100
000 messages each partition) 5 consumers(consumer groups),
consumer.timeout=10ms. OOME was gotten after 1-2 minutes after start.
Java heap - Xms=1024M
LAN about 10Gbit
This is standalone application.

Kafka version 0.8.2

Messages have about 5-10kB size each.
Before OOME consumers received from 5 000 to 30 000 messages per request
from one topic.
Each consumer reads from different topics.

Part of the code(manually handle offsets) :

 Map streamCounts = Collections.singletonMap(topic, 1);
ConsumerConnector connector =
consumer.createJavaConsumerConnector(consumerConf);

Map>> streams =
connector.createMessageStreams(streamCounts);
//read in one stream
KafkaStream stream = streams.get(topic).get(0);
//read from stream in infinity loop
while(true){
 try {
for (MessageAndMetadata messageAndMetadata :
stream) {
Message msg = new Message(messageAndMetadata);
messages.add(msg);
}
} catch (ConsumerTimeoutException ignore) {
// throws every time when Kafka consumer timeout was reached
}
//some logic

//commit offsets

//thread sleep 1 sec
}

//close connection



Stacktrace log

[[03/06/2015 13:23:56
[ConsumerFetcherThread-preprocessorTopicReporting_mo-host-1425648218660-2d7f632b-0-5]
ERROR kafka.network.BoundedByteBufferReceive - OOME with size 1048612
   2097206
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
at
kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

*Q1: How to solve this issue?*
*Q2: What can be the root cause?*
*Q3: How can i control memory allocating for each consumer?*

*Thanks*
*Dima.*


Re: consumer groups in python

2015-03-17 Thread Todd Palino
Yeah, this is exactly correct. The python client does not implement the
Zookeeper logic that would be needed to do a balanced consumer. While it's
certainly possible to do it (for example, Joe implemented it in Go), the
logic is non-trivial and nobody has bothered to this point. I don't think
anyone will, as the new consumer will make it much easier to implement
clients without needing to do it.

In the past, we've used an internal python module that calls a C library
underneath that does the balancing. Now we're moving to one that calls our
REST interface to Kafka, which is easier to work with. Another option that
some consumers use is to pipe messages in from the kafka-console-consumer.
This works well, but if you're not careful with stopping it you can easily
lose messages.

-Todd


On Tue, Mar 17, 2015 at 6:47 AM, Sloot, Hans-Peter <
hans-peter.sl...@atos.net> wrote:

> Thanks
>
> I just came across this https://github.com/mumrah/kafka-python/issues/112
> It says:
> That contract of one message per consumer group only works for the
> coordinated consumers which are implemented for the JVM only (i.e., Scala
> and Java clients).
>
>
> -Original Message-
> From: Steve Miller [mailto:st...@idrathernotsay.com]
> Sent: Tuesday, March 17, 2015 2:18 PM
> To: users@kafka.apache.org
> Subject: Re: consumer groups in python
>
> It's possible that I just haven't used it but I am reasonably sure that
> the python API doesn't have a way to store offsets in ZK.  You would need
> to implement something more or less compatible with what the Scala/Java API
> does, presumably.
>
> On the plus side the python API -- possibly just because in python,
> nothing is truly private (: -- exposes offsets and offset management in
> ways that those other APIs seem not to.   Seeking, say, to approximately
> 1000 messages before the current offset is no big deal in python, nor is
> fetching oldest and newest offsets for topics (e.g., if you want to alert
> if nothing is being produced, without having to fire up a consumer).  I
> have close to zero experience with anything other than the python API and
> librdkafka but judging from questions I see here those seem to be difficult
> to do in Scala or Java.  I hope to do more with those APIs soon (and in
> fact am at ScalaDays right now in part so I can attend some intro Scala
> training (-: ).
>
> -Steve
>
>
>
> > On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <
> hans-peter.sl...@atos.net> wrote:
> >
> > Hi,
> >
> > I wrote a small python script to consume messages from kafka.
> >
> > The consumer is defined as follows:
> > kafka = KafkaConsumer('my-replicated-topic',
> >   metadata_broker_list=['localhost:9092'],
> >   group_id='my_consumer_group',
> >   auto_commit_enable=True,
> >   auto_commit_interval_ms=30 * 1000,
> >   auto_offset_reset='smallest')
> >
> > But when I start 2 consumers simultaneously both receive all messages
> from the topic.
> > I would expect to have 1 consumer about half the number of messages and
> the other the rest.
> >
> > How can I arrange this?
> >
> > Regards Hans-Peter
> >
> > This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
>


RE: consumer groups in python

2015-03-17 Thread Sloot, Hans-Peter
Thanks

I just came across this https://github.com/mumrah/kafka-python/issues/112
It says:
That contract of one message per consumer group only works for the 
coordinated consumers which are implemented for the JVM only (i.e., Scala and 
Java clients).


-Original Message-
From: Steve Miller [mailto:st...@idrathernotsay.com]
Sent: Tuesday, March 17, 2015 2:18 PM
To: users@kafka.apache.org
Subject: Re: consumer groups in python

It's possible that I just haven't used it but I am reasonably sure that the 
python API doesn't have a way to store offsets in ZK.  You would need to 
implement something more or less compatible with what the Scala/Java API does, 
presumably.

On the plus side the python API -- possibly just because in python, nothing is 
truly private (: -- exposes offsets and offset management in ways that those 
other APIs seem not to.   Seeking, say, to approximately 1000 messages before 
the current offset is no big deal in python, nor is fetching oldest and newest 
offsets for topics (e.g., if you want to alert if nothing is being produced, 
without having to fire up a consumer).  I have close to zero experience with 
anything other than the python API and librdkafka but judging from questions I 
see here those seem to be difficult to do in Scala or Java.  I hope to do more 
with those APIs soon (and in fact am at ScalaDays right now in part so I can 
attend some intro Scala training (-: ).

-Steve



> On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter  
> wrote:
>
> Hi,
>
> I wrote a small python script to consume messages from kafka.
>
> The consumer is defined as follows:
> kafka = KafkaConsumer('my-replicated-topic',
>   metadata_broker_list=['localhost:9092'],
>   group_id='my_consumer_group',
>   auto_commit_enable=True,
>   auto_commit_interval_ms=30 * 1000,
>   auto_offset_reset='smallest')
>
> But when I start 2 consumers simultaneously both receive all messages from 
> the topic.
> I would expect to have 1 consumer about half the number of messages and the 
> other the rest.
>
> How can I arrange this?
>
> Regards Hans-Peter
>
> This e-mail and the documents attached are confidential and intended solely 
> for the addressee; it may also be privileged. If you receive this e-mail in 
> error, please notify the sender immediately and destroy it. As its integrity 
> cannot be secured on the Internet, Atos’ liability cannot be triggered for 
> the message content. Although the sender endeavours to maintain a computer 
> virus-free network, the sender does not warrant that this transmission is 
> virus-free and will not be liable for any damages resulting from any virus 
> transmitted. On all offers and agreements under which Atos Nederland B.V. 
> supplies goods and/or services of whatever nature, the Terms of Delivery from 
> Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be 
> promptly submitted to you on your request.
This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, Atos’ liability cannot be triggered for the message 
content. Although the sender endeavours to maintain a computer virus-free 
network, the sender does not warrant that this transmission is virus-free and 
will not be liable for any damages resulting from any virus transmitted. On all 
offers and agreements under which Atos Nederland B.V. supplies goods and/or 
services of whatever nature, the Terms of Delivery from Atos Nederland B.V. 
exclusively apply. The Terms of Delivery shall be promptly submitted to you on 
your request.


Re: consumer groups in python

2015-03-17 Thread Steve Miller
It's possible that I just haven't used it but I am reasonably sure that the 
python API doesn't have a way to store offsets in ZK.  You would need to 
implement something more or less compatible with what the Scala/Java API does, 
presumably.

On the plus side the python API -- possibly just because in python, nothing is 
truly private (: -- exposes offsets and offset management in ways that those 
other APIs seem not to.   Seeking, say, to approximately 1000 messages before 
the current offset is no big deal in python, nor is fetching oldest and newest 
offsets for topics (e.g., if you want to alert if nothing is being produced, 
without having to fire up a consumer).  I have close to zero experience with 
anything other than the python API and librdkafka but judging from questions I 
see here those seem to be difficult to do in Scala or Java.  I hope to do more 
with those APIs soon (and in fact am at ScalaDays right now in part so I can 
attend some intro Scala training (-: ).

-Steve



> On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter  
> wrote:
> 
> Hi,
> 
> I wrote a small python script to consume messages from kafka.
> 
> The consumer is defined as follows:
> kafka = KafkaConsumer('my-replicated-topic',
>   metadata_broker_list=['localhost:9092'],
>   group_id='my_consumer_group',
>   auto_commit_enable=True,
>   auto_commit_interval_ms=30 * 1000,
>   auto_offset_reset='smallest')
> 
> But when I start 2 consumers simultaneously both receive all messages from 
> the topic.
> I would expect to have 1 consumer about half the number of messages and the 
> other the rest.
> 
> How can I arrange this?
> 
> Regards Hans-Peter
> 
> This e-mail and the documents attached are confidential and intended solely 
> for the addressee; it may also be privileged. If you receive this e-mail in 
> error, please notify the sender immediately and destroy it. As its integrity 
> cannot be secured on the Internet, Atos’ liability cannot be triggered for 
> the message content. Although the sender endeavours to maintain a computer 
> virus-free network, the sender does not warrant that this transmission is 
> virus-free and will not be liable for any damages resulting from any virus 
> transmitted. On all offers and agreements under which Atos Nederland B.V. 
> supplies goods and/or services of whatever nature, the Terms of Delivery from 
> Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be 
> promptly submitted to you on your request.


consumer groups in python

2015-03-17 Thread Sloot, Hans-Peter
Hi,

I wrote a small python script to consume messages from kafka.

The consumer is defined as follows:
kafka = KafkaConsumer('my-replicated-topic',
   metadata_broker_list=['localhost:9092'],
   group_id='my_consumer_group',
   auto_commit_enable=True,
   auto_commit_interval_ms=30 * 1000,
   auto_offset_reset='smallest')

But when I start 2 consumers simultaneously both receive all messages from the 
topic.
I would expect to have 1 consumer about half the number of messages and the 
other the rest.

How can I arrange this?

Regards Hans-Peter

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, Atos’ liability cannot be triggered for the message 
content. Although the sender endeavours to maintain a computer virus-free 
network, the sender does not warrant that this transmission is virus-free and 
will not be liable for any damages resulting from any virus transmitted. On all 
offers and agreements under which Atos Nederland B.V. supplies goods and/or 
services of whatever nature, the Terms of Delivery from Atos Nederland B.V. 
exclusively apply. The Terms of Delivery shall be promptly submitted to you on 
your request.


Re: Monitoring of consumer group lag

2015-03-17 Thread Kasper Mackenhauer Jacobsen
Hi Mathias,

We're currently using a custom solution that queries kafka and zookeeper (2
different processes) for topic size and consumer offset and submits the
information to a collectd/statsd instance that ships it on to graphite, so
we can track it in grafana.

There's no alerting built in, but it gives us a good overview of what's
going on without having to consolidate another service like the offset
monitor


On Tue, Mar 17, 2015 at 10:36 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:

> Hi Lance,
>
> I tried Kafka Offset Monitor a while back, but it didn't play especially
> nice with a lot of topics / partitions (we currently have around 1400
> topics and 4000 partitions in total). Might be possible to make it work a
> bit better, but not sure it would be the best way to do alerting.
>
> Thanks for the tip though :).
>
> Best regards,
> Mathias
>
>
> On Mon, 16 Mar 2015 at 21:02 Lance Laursen 
> wrote:
>
> > Hey Mathias,
> >
> > Kafka Offset Monitor will give you a general idea of where your consumer
> > group(s) are at:
> >
> > http://quantifind.com/KafkaOffsetMonitor/
> >
> > However, I'm not sure how useful it will be with "a large number of
> topics"
> > / turning its output into a script that alerts upon a threshold. Could
> take
> > a look and see what they're doing though.
> >
> > On Mon, Mar 16, 2015 at 8:31 AM, Mathias Söderberg <
> > mathias.soederb...@gmail.com> wrote:
> >
> > > Good day,
> > >
> > > I'm looking into using SimpleConsumer#getOffsetsBefore and offsets
> > > committed in ZooKeeper for monitoring the lag of a consumer group.
> > >
> > > Our current use case is that we have a service that is continuously
> > > consuming messages of a large number of topics and persisting the
> > messages
> > > to S3 at somewhat regular intervals (depends on time and the total size
> > of
> > > consumed messages for each partition). Offsets are committed to
> ZooKeeper
> > > after the messages have been persisted to S3.
> > > The partitions are of varying load, so a simple threshold based on the
> > > number of messages we're lagging behind would be cumbersome to maintain
> > due
> > > to the number of topics, and most likely prone to unnecessary alerts.
> > >
> > > Currently our broker configuration specifies log.roll.hours=1 and
> > > log.segment.bytes=1GB, and my proposed solution is to have a separate
> > > service that would iterate through all topics/partitions and use
> > > #getOffsetsBefore with a timestamp that is one (1) or two (2) hours ago
> > and
> > > compare the first offset (which from my testing looks to be the offset
> > that
> > > is closest in time, i.e. from the log segment that is closest to the
> > > timestamp given) with the one that is saved to ZooKeeper.
> > > It feels like a pretty solid solution, given that we just want a rough
> > > estimate of how much we're lagging behind in time, so that we know
> > (again,
> > > roughly) how much time we have to fix whatever is broken before the log
> > > segments are deleted by Kafka.
> > >
> > > Is there anyone doing monitoring similar to this? Are there any obvious
> > > downsides of this approach that I'm not thinking about? Thoughts on
> > > alternatives?
> > >
> > > Best regards,
> > > Mathias
> > >
> >
>



-- 
*Kasper Mackenhauer Jacobsen*


Re: Monitoring of consumer group lag

2015-03-17 Thread Mathias Söderberg
Hi Lance,

I tried Kafka Offset Monitor a while back, but it didn't play especially
nice with a lot of topics / partitions (we currently have around 1400
topics and 4000 partitions in total). Might be possible to make it work a
bit better, but not sure it would be the best way to do alerting.

Thanks for the tip though :).

Best regards,
Mathias


On Mon, 16 Mar 2015 at 21:02 Lance Laursen 
wrote:

> Hey Mathias,
>
> Kafka Offset Monitor will give you a general idea of where your consumer
> group(s) are at:
>
> http://quantifind.com/KafkaOffsetMonitor/
>
> However, I'm not sure how useful it will be with "a large number of topics"
> / turning its output into a script that alerts upon a threshold. Could take
> a look and see what they're doing though.
>
> On Mon, Mar 16, 2015 at 8:31 AM, Mathias Söderberg <
> mathias.soederb...@gmail.com> wrote:
>
> > Good day,
> >
> > I'm looking into using SimpleConsumer#getOffsetsBefore and offsets
> > committed in ZooKeeper for monitoring the lag of a consumer group.
> >
> > Our current use case is that we have a service that is continuously
> > consuming messages of a large number of topics and persisting the
> messages
> > to S3 at somewhat regular intervals (depends on time and the total size
> of
> > consumed messages for each partition). Offsets are committed to ZooKeeper
> > after the messages have been persisted to S3.
> > The partitions are of varying load, so a simple threshold based on the
> > number of messages we're lagging behind would be cumbersome to maintain
> due
> > to the number of topics, and most likely prone to unnecessary alerts.
> >
> > Currently our broker configuration specifies log.roll.hours=1 and
> > log.segment.bytes=1GB, and my proposed solution is to have a separate
> > service that would iterate through all topics/partitions and use
> > #getOffsetsBefore with a timestamp that is one (1) or two (2) hours ago
> and
> > compare the first offset (which from my testing looks to be the offset
> that
> > is closest in time, i.e. from the log segment that is closest to the
> > timestamp given) with the one that is saved to ZooKeeper.
> > It feels like a pretty solid solution, given that we just want a rough
> > estimate of how much we're lagging behind in time, so that we know
> (again,
> > roughly) how much time we have to fix whatever is broken before the log
> > segments are deleted by Kafka.
> >
> > Is there anyone doing monitoring similar to this? Are there any obvious
> > downsides of this approach that I'm not thinking about? Thoughts on
> > alternatives?
> >
> > Best regards,
> > Mathias
> >
>


Re: Broker Exceptions

2015-03-17 Thread Mayuresh Gharat
Hi Zakee,

Thanks for the logs. Can you paste earlier logs from broker-3 up to :

[2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
offset 1754769769 for partition [Topic22kv,5] out of range; reset
offset to 1400864851 (kafka.server.ReplicaFetcherThread)

That would help us figure out what was happening on this broker before it
issued a replicaFetch request to broker-4.

Thanks,

Mayuresh

On Mon, Mar 16, 2015 at 11:32 PM, Zakee  wrote:

> Hi Mayuresh,
>
> Here are the logs.
>
> 
> Old School Yearbook Pics
> View Class Yearbooks Online Free. Search by School & Year. Look Now!
> http://thirdpartyoffers.netzero.net/TGL3231/5507ca8137dc94a805e6bst01vuc
>
>
> Thanks,
> Kazim Zakee
>
>
>
> > On Mar 16, 2015, at 10:48 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
> >
> > Can you provide more logs (complete) on Broker 3 till time :
> >
> > *[2015-03-14 07:46:52,517*] WARN [ReplicaFetcherThread-2-4], Replica 3
> for
> > partition [Topic22kv,5] reset its fetch offset from 1400864851 to current
> > leader 4's start offset 1400864851 (kafka.server.ReplicaFetcherThread)
> >
> > I would like to see logs from time much before it sent the fetch request
> to
> > Broker 4 to the time above. I want to check if in any case Broker 3 was a
> > leader before broker 4 took over.
> >
> > Additional logs will help.
> >
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> >
> > On Sat, Mar 14, 2015 at 8:35 PM, Zakee  wrote:
> >
> >> log.cleanup.policy is delete not compact.
> >> log.cleaner.enable=true
> >> log.cleaner.threads=5
> >> log.cleanup.policy=delete
> >> log.flush.scheduler.interval.ms=3000
> >> log.retention.minutes=1440
> >> log.segment.bytes=1073741824  (1gb)
> >>
> >> Messages are keyed but not compressed, producer async and uses kafka
> >> default partitioner.
> >> String message = msg.getString();
> >> String uniqKey = ""+rnd.nextInt();// random key
> >> String partKey = getPartitionKey();// partition key
> >> KeyedMessage data = new KeyedMessage >> String>(this.topicName, uniqKey, partKey, message);
> >> producer.send(data);
> >>
> >> Thanks
> >> Zakee
> >>
> >>
> >>
> >>> On Mar 14, 2015, at 4:23 PM, gharatmayures...@gmail.com wrote:
> >>>
> >>> Is your topic log compacted? Also if it is are the messages keyed? Or
> >> are the messages compressed?
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>> Sent from my iPhone
> >>>
>  On Mar 14, 2015, at 2:02 PM, Zakee  >> kzak...@netzero.net>> wrote:
> 
>  Thanks, Jiangjie for helping resolve the kafka controller migration
> >> driven partition leader rebalance issue. The logs are much cleaner now.
> 
>  There are a few incidences of Out of range offset even though  there
> is
> >> no consumers running, only producers and replica fetchers. I was trying
> to
> >> relate to a cause, looks like compaction (log segment deletion) causing
> >> this. Not sure whether this is expected behavior.
> 
>  Broker-4:
>  [2015-03-14 07:46:52,338] ERROR [Replica Manager on Broker 4]: Error
> >> when processing fetch request for partition [Topic22kv,5] offset
> 1754769769
> >> from follower with correlation id 1645671. Possible cause: Request for
> >> offset 1754769769 but we only have log segments in the range 1400864851
> to
> >> 1754769732. (kafka.server.ReplicaManager)
> 
>  Broker-3:
>  [2015-03-14 07:46:52,356] INFO The cleaning for partition
> [Topic22kv,5]
> >> is aborted and paused (kafka.log.LogCleaner)
>  [2015-03-14 07:46:52,408] INFO Scheduling log segment 1400864851 for
> >> log Topic22kv-5 for deletion. (kafka.log.Log)
>  …
>  [2015-03-14 07:46:52,421] INFO Compaction for partition [Topic22kv,5]
> >> is resumed (kafka.log.LogCleaner)
>  [2015-03-14 07:46:52,517] ERROR [ReplicaFetcherThread-2-4], Current
> >> offset 1754769769 for partition [Topic22kv,5] out of range; reset
> offset to
> >> 1400864851 (kafka.server.ReplicaFetcherThread)
>  [2015-03-14 07:46:52,517] WARN [ReplicaFetcherThread-2-4], Replica 3
> >> for partition [Topic22kv,5] reset its fetch offset from 1400864851 to
> >> current leader 4's start offset 1400864851
> >> (kafka.server.ReplicaFetcherThread)
> 
>  
>  Old School Yearbook Pics
>  View Class Yearbooks Online Free. Search by School & Year. Look Now!
> 
> >>
> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc <
> >>
> http://thirdpartyoffers.netzero.net/TGL3231/5504a2032e49422021991st02vuc>
>  
> 
> 
>  Thanks
>  Zakee
> 
> > On Mar 9, 2015, at 12:18 PM, Zakee  wrote:
> >
> > No broker restarts.
> >
> > Created a kafka issue:
> >> https://issues.apache.org/jira/browse/KAFKA-2011 <
> >> https://issues.apache.org/jira/browse/KAFKA-2011>
> >
> >>> Logs for rebalance:
> >>> [2015-03-07 16:52:48,969] INFO [Controller 2]: Resuming preferred
> >> replica elect