Re: Kafka @ apachecon Denver?

2014-01-27 Thread David Corley
@Joe, sounds interesting. Will keep an eye out for the session


On Mon, Jan 27, 2014 at 4:57 PM, Steve Morin  wrote:

> Are you using mesos?
>
> > On Jan 27, 2014, at 8:39, Joe Stein  wrote:
> >
> > I was going to submit a talk on Kafka and Mesos.  I still am trying to
> nail down the dates in my schedule though.  Anyone else going? Maybe we
> could do a meetup or bof or something?
> >
> >
> > /***
> > Joe Stein
> > Founder, Principal Consultant
> > Big Data Open Source Security LLC
> > http://www.stealth.ly
> > Twitter: @allthingshadoop
> > /
> >
> >
> >> On Jan 27, 2014, at 9:54 AM, David Corley 
> wrote:
> >>
> >> Kafka Devs,
> >> Just wondering if there'll be anything in the line of Kafka
> presentations
> >> and/or tutorials at ApacheCon in Denver in April?
>


Re: New Producer Public API

2014-01-27 Thread Clark Breyman
Jay -

Config - your explanation makes sense. I'm just so accustomed to having
Jackson automatically map my configuration objects to POJOs that I've
stopped using property files. They are lingua franca. The only thought
might be to separate the config interface from the implementation to allow
for alternatives, but that might undermine your point of "do it this way so
that everyone can find it where they expect it".

Serialization: Of the options, I like 1A the best, though possibly with
either an option to specify a partition key rather than ID or a helper to
translate an arbitrary byte[] or long into a partition number.

Thanks
Clark


On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps  wrote:

> Thanks for the detailed thoughts. Let me elaborate on the config thing.
>
> I agree that at first glance key-value strings don't seem like a very good
> configuration api for a client. Surely a well-typed config class would be
> better! I actually disagree and let me see if I can convince you.
>
> My reasoning has nothing to do with the api and everything to do with
> operations.
>
> Clients are embedded in applications which are themselves configured. In
> any place that takes operations seriously the configuration for these
> applications will be version controlled and maintained through some kind of
> config management system. If we give a config class with getters and
> setters the application has to expose those properties to its
> configuration. What invariably happens is that the application exposes only
> a choice few properties that they thought they would change. Furthermore
> the application will make up a name for these configs that seems intuitive
> at the time in the 2 seconds the engineer spends thinking about it.
>
> Now consider the result of this in the large. You end up with dozens or
> hundreds of applications that have the client embedded. Each exposes a
> different, inadequate subset of the possible configs, each with different
> names. It is a nightmare.
>
> If you use a string-string map the config system can directly get a bundle
> of config key-value pairs and put them into the client. This means that all
> configuration is automatically available with the name documented on the
> website in every application that does this. If you upgrade to a new kafka
> version with more configs those will be exposed too. If you realize that
> you need to change a default you can just go through your configs and
> change it everywhere as it will have the same name everywhere.
>
> -Jay
>
>
>
>
> On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman  wrote:
>
> > Thanks Jay. I'll see if I can put together a more complete response,
> > perhaps as separate threads so that topics don't get entangled. In the
> mean
> > time, here's a couple responses:
> >
> > Serialization: you've broken out a sub-thread so i'll reply there. My
> bias
> > is that I like generics (except for type-erasure) and in particular they
> > make it easy to compose serializers for compound payloads (e.g. when a
> > common header wraps a payload of parameterized type). I'll respond to
> your
> > 4-options message with an example.
> >
> > Build: I've seen a lot of "maven-compatible" build systems produce
> > "artifacts" that aren't really artifacts - no embedded POM or, worst,
> > malformed POM. I know the sbt-generated artifacts were this way - onus is
> > on me to see what gradle is spitting out and what a maven build might
> look
> > like. Maven may be old and boring, but it gets out of the way and
> > integrates really seamlessly with a lot of IDEs. When some scala
> projects I
> > was working on in the fall of 2011 switched from sbt to maven, build
> became
> > a non-issue.
> >
> > Config: Not a big deal  and no, I don't think a dropwizard dependency is
> > appropriate. I do like using simple entity beans (POJO's not j2EE) for
> > configuration, especially if they can be marshalled without annotation by
> > Jackson. I only mentioned the dropwizard-extras  because it has some
> entity
> > bean versions of the ZK and Kafka configs.
> >
> > Domain-packaging: Also not a big deal - it's what's expected and it's
> > pretty free in most IDE's. The advantages I see is that it is clear
> whether
> > something is from the Apache Kafka project and whether something is from
> > another org and related to Kafka. That said, nothing really enforces it.
> >
> > Futures: I'll see if I can create some examples to demonstrate Future
> > making interop easier.
> >
> > Regards,
> > C
> >
> >
> >
> >
> > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps  wrote:
> >
> > > Hey Clark,
> > >
> > > - Serialization: Yes I agree with these though I don't consider the
> loss
> > of
> > > generics a big issue. I'll try to summarize what I would consider the
> > best
> > > alternative api with raw byte[].
> > >
> > > - Maven: We had this debate a few months back and the consensus was
> > gradle.
> > > Is there a specific issue with the poms gradle makes? I am extremely
> > loath
> > 

Re: New Producer Public API

2014-01-27 Thread Jay Kreps
Clark,

Yeah good point. Okay I'm sold on Closable. Autoclosable would be much
better, but for now we are retaining 1.6 compatibility and I suspect the
use case of temporarily creating a producer would actually be a more rare
case.

-Jay


On Mon, Jan 27, 2014 at 9:29 AM, Clark Breyman  wrote:

> re: "Using package to avoid ambiguity"  - Unlike Scala, this is really
> cumbersome in Java as it doesn't support package imports or import aliases,
> so the only way to distinguish is to use the fully qualified path.
>
> re: Closable - it can throw IOException but is not required to. Same with
> AutoCloseable (are J7-specific classes and methods permitted?). Closable
> and AutoClosable are really nice in eliminating finally clauses.  What to
> do with a exception on close()? Log it, use it as a signal of environment
> health and move on. If it happens too frequently, you know your process
> environment is degraded and might need to fail over and restart.
>


RE: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

2014-01-27 Thread Xuyen On
Oh, I should note that the built in consumer offsetchecker tool works for me 
but I was hoping to use something like jmxtrans so that I could easily export 
the data to ganglia, graphite or some other graphing tool. Jmxtrans was 
recommended from the Kafka wiki and there's another project called 
kafka-ganglia but it doesn't support 0.7.2 Kafka which is what we're using 
right now. Any suggestions to get real time reporting of consumer lag through a 
tool that can export to ganglia would be appreciated.

Thanks,

Xuyen

-Original Message-
From: Xuyen On [mailto:x...@ancestry.com] 
Sent: Monday, January 27, 2014 9:33 AM
To: users@kafka.apache.org
Subject: RE: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

Thanks for your response Joel.

I am currently trying out JMXTrans to get the stats from MBean and I can read 
attributes fine but it doesn't support JMX Operations yet. What tool do you use 
for your reporting? Is there another tool that supports JMX operations so that 
I can use the getOffsetLag operation?

Thanks,

Xuyen

-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Friday, January 24, 2014 5:48 PM
To: users@kafka.apache.org
Subject: Re: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

> kafka.logs.eventdata-0
>   Attributes
>   Name  // Name of partition
>   Size   //Is this the 
> current number of messages?
Size -> in bytes

>   NumberofSegments // Don't know what this is 
Each partition contains multiple segment (files) on disk.

>   CurrentOffset   //Is this the current offset of 
> a consumer?

No - it is the (byte) offset up to which we have flushed. It is unrelated to 
the current consumption point of consumers.

>   NumAppendedMessages   // Don't know what this is
Simple counter of number of messages sent so far to this partition.

> Can someone please tell me what these attributes mean? I am trying to find 
> out if I can use these values to calculate the consumer lag ie: Size - 
> CurrentOffset and report it to a chart or dashboard somewhere for real-time 
> analysis.

So the above would be insufficient to measure consumer lag. You can use the 
consumer offset checker tool and extract the lag from its output. There is also 
an mbean operation on the consumers
(getOffsetLag) that you can use on a per-partition basis.

Thanks,

Joel




RE: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

2014-01-27 Thread Xuyen On
Thanks for your response Joel.

I am currently trying out JMXTrans to get the stats from MBean and I can read 
attributes fine but it doesn't support JMX Operations yet. What tool do you use 
for your reporting? Is there another tool that supports JMX operations so that 
I can use the getOffsetLag operation?

Thanks,

Xuyen

-Original Message-
From: Joel Koshy [mailto:jjkosh...@gmail.com] 
Sent: Friday, January 24, 2014 5:48 PM
To: users@kafka.apache.org
Subject: Re: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

> kafka.logs.eventdata-0
>   Attributes
>   Name  // Name of partition
>   Size   //Is this the 
> current number of messages?
Size -> in bytes

>   NumberofSegments // Don't know what this is 
Each partition contains multiple segment (files) on disk.

>   CurrentOffset   //Is this the current offset of 
> a consumer?

No - it is the (byte) offset up to which we have flushed. It is unrelated to 
the current consumption point of consumers.

>   NumAppendedMessages   // Don't know what this is
Simple counter of number of messages sent so far to this partition.

> Can someone please tell me what these attributes mean? I am trying to find 
> out if I can use these values to calculate the consumer lag ie: Size - 
> CurrentOffset and report it to a chart or dashboard somewhere for real-time 
> analysis.

So the above would be insufficient to measure consumer lag. You can use the 
consumer offset checker tool and extract the lag from its output. There is also 
an mbean operation on the consumers
(getOffsetLag) that you can use on a per-partition basis.

Thanks,

Joel




Re: New Producer Public API

2014-01-27 Thread Clark Breyman
re: "Using package to avoid ambiguity"  - Unlike Scala, this is really
cumbersome in Java as it doesn't support package imports or import aliases,
so the only way to distinguish is to use the fully qualified path.

re: Closable - it can throw IOException but is not required to. Same with
AutoCloseable (are J7-specific classes and methods permitted?). Closable
and AutoClosable are really nice in eliminating finally clauses.  What to
do with a exception on close()? Log it, use it as a signal of environment
health and move on. If it happens too frequently, you know your process
environment is degraded and might need to fail over and restart.


Re: Calcuate Consumer lag from JMX beans in Kafka 0.7.2

2014-01-27 Thread Otis Gospodnetic
Hi Xuyen,

SPM for Kafka gets all this stuff already.  I didn't look into whether/how
exactly one can spot consumer lag, but if you spot a way to do it, please
share.

There is a demo of SPM for Kafka if you go to
https://apps.sematext.com/demoso you can see all Kafka performance
graphs and see if any of the metrics
there show consumer lag directly or indirectly.

Otis
--
Performance Monitoring * Log Analytics * Search Analytics
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Jan 27, 2014 at 12:49 PM, Xuyen On  wrote:

> Oh, I should note that the built in consumer offsetchecker tool works for
> me but I was hoping to use something like jmxtrans so that I could easily
> export the data to ganglia, graphite or some other graphing tool. Jmxtrans
> was recommended from the Kafka wiki and there's another project called
> kafka-ganglia but it doesn't support 0.7.2 Kafka which is what we're using
> right now. Any suggestions to get real time reporting of consumer lag
> through a tool that can export to ganglia would be appreciated.
>
> Thanks,
>
> Xuyen
>
> -Original Message-
> From: Xuyen On [mailto:x...@ancestry.com]
> Sent: Monday, January 27, 2014 9:33 AM
> To: users@kafka.apache.org
> Subject: RE: Calcuate Consumer lag from JMX beans in Kafka 0.7.2
>
> Thanks for your response Joel.
>
> I am currently trying out JMXTrans to get the stats from MBean and I can
> read attributes fine but it doesn't support JMX Operations yet. What tool
> do you use for your reporting? Is there another tool that supports JMX
> operations so that I can use the getOffsetLag operation?
>
> Thanks,
>
> Xuyen
>
> -Original Message-
> From: Joel Koshy [mailto:jjkosh...@gmail.com]
> Sent: Friday, January 24, 2014 5:48 PM
> To: users@kafka.apache.org
> Subject: Re: Calcuate Consumer lag from JMX beans in Kafka 0.7.2
>
> > kafka.logs.eventdata-0
> >   Attributes
> >   Name  // Name of partition
> >   Size   //Is this the
> current number of messages?
> Size -> in bytes
>
> >   NumberofSegments // Don't know what this is
> Each partition contains multiple segment (files) on disk.
>
> >   CurrentOffset   //Is this the current
> offset of a consumer?
>
> No - it is the (byte) offset up to which we have flushed. It is unrelated
> to the current consumption point of consumers.
>
> >   NumAppendedMessages   // Don't know what this is
> Simple counter of number of messages sent so far to this partition.
>
> > Can someone please tell me what these attributes mean? I am trying to
> find out if I can use these values to calculate the consumer lag ie: Size -
> CurrentOffset and report it to a chart or dashboard somewhere for real-time
> analysis.
>
> So the above would be insufficient to measure consumer lag. You can use
> the consumer offset checker tool and extract the lag from its output. There
> is also an mbean operation on the consumers
> (getOffsetLag) that you can use on a per-partition basis.
>
> Thanks,
>
> Joel
>
>
>


Re: New Producer Public API

2014-01-27 Thread Xavier Stevens
AutoCloseable would be nice for us as most of our code is using Java 7 at
this point.

I like Dropwizard's configuration mapping to POJOs via Jackson, but if you
wanted to stick with property maps I don't care enough to object.

If the producer only dealt with bytes, is there a way we could still due
partition plugins without specifying the number explicitly? I would prefer
to be able to pass in field(s) that would be used by the partitioner.
Obviously if this wasn't possible you could always deserialize the object
in the partitioner and grab the fields you want, but that seems really
expensive to do on every message.

It would also be nice to have a Java API Encoder constructor taking in
VerifiableProperties. Scala understands how to handle "props:
VerifiableProperties = null", but Java doesn't. So you don't run into this
problem until runtime.


-Xavier


On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman  wrote:

> Jay -
>
> Config - your explanation makes sense. I'm just so accustomed to having
> Jackson automatically map my configuration objects to POJOs that I've
> stopped using property files. They are lingua franca. The only thought
> might be to separate the config interface from the implementation to allow
> for alternatives, but that might undermine your point of "do it this way so
> that everyone can find it where they expect it".
>
> Serialization: Of the options, I like 1A the best, though possibly with
> either an option to specify a partition key rather than ID or a helper to
> translate an arbitrary byte[] or long into a partition number.
>
> Thanks
> Clark
>
>
> On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps  wrote:
>
> > Thanks for the detailed thoughts. Let me elaborate on the config thing.
> >
> > I agree that at first glance key-value strings don't seem like a very
> good
> > configuration api for a client. Surely a well-typed config class would be
> > better! I actually disagree and let me see if I can convince you.
> >
> > My reasoning has nothing to do with the api and everything to do with
> > operations.
> >
> > Clients are embedded in applications which are themselves configured. In
> > any place that takes operations seriously the configuration for these
> > applications will be version controlled and maintained through some kind
> of
> > config management system. If we give a config class with getters and
> > setters the application has to expose those properties to its
> > configuration. What invariably happens is that the application exposes
> only
> > a choice few properties that they thought they would change. Furthermore
> > the application will make up a name for these configs that seems
> intuitive
> > at the time in the 2 seconds the engineer spends thinking about it.
> >
> > Now consider the result of this in the large. You end up with dozens or
> > hundreds of applications that have the client embedded. Each exposes a
> > different, inadequate subset of the possible configs, each with different
> > names. It is a nightmare.
> >
> > If you use a string-string map the config system can directly get a
> bundle
> > of config key-value pairs and put them into the client. This means that
> all
> > configuration is automatically available with the name documented on the
> > website in every application that does this. If you upgrade to a new
> kafka
> > version with more configs those will be exposed too. If you realize that
> > you need to change a default you can just go through your configs and
> > change it everywhere as it will have the same name everywhere.
> >
> > -Jay
> >
> >
> >
> >
> > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman 
> wrote:
> >
> > > Thanks Jay. I'll see if I can put together a more complete response,
> > > perhaps as separate threads so that topics don't get entangled. In the
> > mean
> > > time, here's a couple responses:
> > >
> > > Serialization: you've broken out a sub-thread so i'll reply there. My
> > bias
> > > is that I like generics (except for type-erasure) and in particular
> they
> > > make it easy to compose serializers for compound payloads (e.g. when a
> > > common header wraps a payload of parameterized type). I'll respond to
> > your
> > > 4-options message with an example.
> > >
> > > Build: I've seen a lot of "maven-compatible" build systems produce
> > > "artifacts" that aren't really artifacts - no embedded POM or, worst,
> > > malformed POM. I know the sbt-generated artifacts were this way - onus
> is
> > > on me to see what gradle is spitting out and what a maven build might
> > look
> > > like. Maven may be old and boring, but it gets out of the way and
> > > integrates really seamlessly with a lot of IDEs. When some scala
> > projects I
> > > was working on in the fall of 2011 switched from sbt to maven, build
> > became
> > > a non-issue.
> > >
> > > Config: Not a big deal  and no, I don't think a dropwizard dependency
> is
> > > appropriate. I do like using simple entity beans (POJO's not j2EE) for
> > > configuration, especia

[Using offset to fetch message][Need Help]

2014-01-27 Thread Abhishek Bhattacharjee
I am using storm and kafka for replaying messages.
Now I want to save offset of each message and then use it later for
resending the message.
So my question is how can I fetch a single message using its offset ?
That is I know the offset of a message and I want to use the offset to
fetch that message(only that message).


Thanks,
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*


Re: Is there a way to delete partition at runtime?

2014-01-27 Thread Marc Labbe
I have the same need, and I've just created a Jira:
https://issues.apache.org/jira/browse/KAFKA-1231

The reasoning behind it is because our topics are created on a per product
basis and each of them usually starts big during the initial weeks and
gradually reduces in time (1-2 years).

thanks
marc


On Thu, Dec 5, 2013 at 7:45 PM, Guozhang Wang  wrote:

> Hi Siyuan,
>
> We do not have a tool to shrink the number of partitions (if that is what
> you want) for a topic at runtime yet. Could you file a JIRA for this?
>
> Guozhang
>
>
> On Thu, Dec 5, 2013 at 2:16 PM, hsy...@gmail.com  wrote:
>
> > Hi guys,
> >
> > I found there is a tool to add partition on the fly. My question is, is
> > there a way to delete a partition at runtime? Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Re: [Using offset to fetch message][Need Help]

2014-01-27 Thread Robert Turner
Rather than fetching the message again you could cache it in the spout and
emit it again if the *fail* method is called and delete it when the
*ack*method is called. This is possible as Storm guarantees to call
the
*fail* and *ack* methods with the *messageId* on the exact same spout that
the message originated from. This means if you have cached the message
there then it will still be available.




On 27 January 2014 19:29, Abhishek Bhattacharjee <
abhishek.bhattacharje...@gmail.com> wrote:

> I am using storm and kafka for replaying messages.
> Now I want to save offset of each message and then use it later for
> resending the message.
> So my question is how can I fetch a single message using its offset ?
> That is I know the offset of a message and I want to use the offset to
> fetch that message(only that message).
>
>
> Thanks,
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>



-- 
Cheers
   Rob.


Re: [Using offset to fetch message][Need Help]

2014-01-27 Thread Guozhang Wang
In Kafka you cannot fetch just one message based on the offset, what you
get instead is to start fetching from the given offset. Of course you can
just get the first one that you want and then discard the rest and stop
fetching immediately, but I think a better idea would be cache the
not-acked-yet messages as Robert suggested.

Guozhang


On Mon, Jan 27, 2014 at 1:39 PM, Robert Turner  wrote:

> Rather than fetching the message again you could cache it in the spout and
> emit it again if the *fail* method is called and delete it when the
> *ack*method is called. This is possible as Storm guarantees to call
> the
> *fail* and *ack* methods with the *messageId* on the exact same spout that
> the message originated from. This means if you have cached the message
> there then it will still be available.
>
>
>
>
> On 27 January 2014 19:29, Abhishek Bhattacharjee <
> abhishek.bhattacharje...@gmail.com> wrote:
>
> > I am using storm and kafka for replaying messages.
> > Now I want to save offset of each message and then use it later for
> > resending the message.
> > So my question is how can I fetch a single message using its offset ?
> > That is I know the offset of a message and I want to use the offset to
> > fetch that message(only that message).
> >
> >
> > Thanks,
> > *Abhishek Bhattacharjee*
> > *Pune Institute of Computer Technology*
> >
>
>
>
> --
> Cheers
>Rob.
>



-- 
-- Guozhang


high-level consumer design

2014-01-27 Thread David Birdsong
Hey All, I've been cobbling together a high-level consumer for golang
building on top of Shopify's Sarama package and wanted to run the basic
design by the list and get some feedback or pointers on things I've missed
or will eventually encounter on my own.

I'm using zookeeper to coordinate topic-partition owners for consumer
members in each consumer group. I followed the znode layout that's apparent
from watching the console consumer.

//{offsets,owners,ids}.

The consumer uses an outer loop to discover the partition list for a given
topic, attempts to grab a zookeeper lock on each (topic,partition) tuple,
and then for each (topic, partition) it successfully locks, launches a
thread (goroutine) for each partition to read the partition stream.

The outer loop continues to watch for children events either of:
//owners//brokers/topics//partitions

...any watch event that fires causes all offset data and consumer handles
to be flushed and closed, goroutines watching topic-partitions exit. The
loop is restarted.

Another thread reads topic-partition-offset data and flushes the offset
to://offsets/

Have I oversimplified or missed any critical steps?


Re: high-level consumer design

2014-01-27 Thread Guozhang Wang
Hello David,

One thing about using ZK locks to "own" a partition is load balancing. If
you are unlucky some consumer may get all the locks and some may get none,
hence have no partitions to consume.

Also you may need some synchronization between the consumer thread with the
offset thread. For example, when an event is fired and the consumers need
to re-try grabbing the locks, it needs to first stop current fetchers,
commit offsets, and then start owning new partitions.

Guozhang


On Mon, Jan 27, 2014 at 3:03 PM, David Birdsong wrote:

> Hey All, I've been cobbling together a high-level consumer for golang
> building on top of Shopify's Sarama package and wanted to run the basic
> design by the list and get some feedback or pointers on things I've missed
> or will eventually encounter on my own.
>
> I'm using zookeeper to coordinate topic-partition owners for consumer
> members in each consumer group. I followed the znode layout that's apparent
> from watching the console consumer.
>
> //{offsets,owners,ids}.
>
> The consumer uses an outer loop to discover the partition list for a given
> topic, attempts to grab a zookeeper lock on each (topic,partition) tuple,
> and then for each (topic, partition) it successfully locks, launches a
> thread (goroutine) for each partition to read the partition stream.
>
> The outer loop continues to watch for children events either of:
>
> //owners//brokers/topics//partitions
>
> ...any watch event that fires causes all offset data and consumer handles
> to be flushed and closed, goroutines watching topic-partitions exit. The
> loop is restarted.
>
> Another thread reads topic-partition-offset data and flushes the offset
> to://offsets/
>
> Have I oversimplified or missed any critical steps?
>



-- 
-- Guozhang


Re: Is there a way to delete partition at runtime?

2014-01-27 Thread Guozhang Wang
Siyuan, Marc:

We are currently working on topic-deletion supports
(KAFKA-330),
would first-delete-then-recreate-with-fewer-partitions work for your cases?
The reason why we are trying to avoid shrinking partition is that it would
make the logic very complicated. For example, we need to think about
within-partition ordering guarantee with partition merging and
producing-in-progress simultaneously.

Guozhang


On Mon, Jan 27, 2014 at 12:35 PM, Marc Labbe  wrote:

> I have the same need, and I've just created a Jira:
> https://issues.apache.org/jira/browse/KAFKA-1231
>
> The reasoning behind it is because our topics are created on a per product
> basis and each of them usually starts big during the initial weeks and
> gradually reduces in time (1-2 years).
>
> thanks
> marc
>
>
> On Thu, Dec 5, 2013 at 7:45 PM, Guozhang Wang  wrote:
>
> > Hi Siyuan,
> >
> > We do not have a tool to shrink the number of partitions (if that is what
> > you want) for a topic at runtime yet. Could you file a JIRA for this?
> >
> > Guozhang
> >
> >
> > On Thu, Dec 5, 2013 at 2:16 PM, hsy...@gmail.com 
> wrote:
> >
> > > Hi guys,
> > >
> > > I found there is a tool to add partition on the fly. My question is, is
> > > there a way to delete a partition at runtime? Thanks!
> > >
> > > Best,
> > > Siyuan
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: high-level consumer design

2014-01-27 Thread David Birdsong
On Mon, Jan 27, 2014 at 4:19 PM, Guozhang Wang  wrote:

> Hello David,
>
> One thing about using ZK locks to "own" a partition is load balancing. If
> you are unlucky some consumer may get all the locks and some may get none,
> hence have no partitions to consume.
>

I've considered this and even encountered it in testing. For our current
load levels, we won't hurt us, but if there's a good solution, I'd rather
codify smooth consumer balance.

Got any suggestions?

My thinking thus far is to establish some sort of identity on the consumer
and derive an evenness or oddness or some modulo value that induces a small
delay when encountering particular partition numbers. It's a hacky idea,
but is pretty simple and might be good enough for smoothing consumers.


> Also you may need some synchronization between the consumer thread with the
> offset thread. For example, when an event is fired and the consumers need
> to re-try grabbing the locks, it needs to first stop current fetchers,
> commit offsets, and then start owning new partitions.
>

This is current design and what I have implemented so far. The last thread
to exit is the offset thread and it has a direct communication channel to
the consumer threads so it waits for those channels to be closed before
it's last flush and exit.


> Guozhang
>
>
Thanks for the input!


>
> On Mon, Jan 27, 2014 at 3:03 PM, David Birdsong  >wrote:
>
> > Hey All, I've been cobbling together a high-level consumer for golang
> > building on top of Shopify's Sarama package and wanted to run the basic
> > design by the list and get some feedback or pointers on things I've
> missed
> > or will eventually encounter on my own.
> >
> > I'm using zookeeper to coordinate topic-partition owners for consumer
> > members in each consumer group. I followed the znode layout that's
> apparent
> > from watching the console consumer.
> >
> > //{offsets,owners,ids}.
> >
> > The consumer uses an outer loop to discover the partition list for a
> given
> > topic, attempts to grab a zookeeper lock on each (topic,partition) tuple,
> > and then for each (topic, partition) it successfully locks, launches a
> > thread (goroutine) for each partition to read the partition stream.
> >
> > The outer loop continues to watch for children events either of:
> >
> >
> //owners//brokers/topics//partitions
> >
> > ...any watch event that fires causes all offset data and consumer handles
> > to be flushed and closed, goroutines watching topic-partitions exit. The
> > loop is restarted.
> >
> > Another thread reads topic-partition-offset data and flushes the offset
> >
> to://offsets/
> >
> > Have I oversimplified or missed any critical steps?
> >
>
>
>
> --
> -- Guozhang
>


Re: [Using offset to fetch message][Need Help]

2014-01-27 Thread Abhishek Bhattacharjee
thanks for the suggestions.
The problem with caching is that I have to cache a lot of messages as I
don't know which one is going to fail.
If a message is processed at one go caching that message is unnecessary
that's why I want to replay it from the kafka itself.
And I want to use the offset as message id so it can serve two purposes
that is message id and offset for fetching the message from kafka. Caching
also adds an overhead don't you think ? Your responses will be appreciated
:)
Thanks.


On Tue, Jan 28, 2014 at 3:45 AM, Guozhang Wang  wrote:

> In Kafka you cannot fetch just one message based on the offset, what you
> get instead is to start fetching from the given offset. Of course you can
> just get the first one that you want and then discard the rest and stop
> fetching immediately, but I think a better idea would be cache the
> not-acked-yet messages as Robert suggested.
>
> Guozhang
>
>
> On Mon, Jan 27, 2014 at 1:39 PM, Robert Turner  wrote:
>
> > Rather than fetching the message again you could cache it in the spout
> and
> > emit it again if the *fail* method is called and delete it when the
> > *ack*method is called. This is possible as Storm guarantees to call
> > the
> > *fail* and *ack* methods with the *messageId* on the exact same spout
> that
> > the message originated from. This means if you have cached the message
> > there then it will still be available.
> >
> >
> >
> >
> > On 27 January 2014 19:29, Abhishek Bhattacharjee <
> > abhishek.bhattacharje...@gmail.com> wrote:
> >
> > > I am using storm and kafka for replaying messages.
> > > Now I want to save offset of each message and then use it later for
> > > resending the message.
> > > So my question is how can I fetch a single message using its offset ?
> > > That is I know the offset of a message and I want to use the offset to
> > > fetch that message(only that message).
> > >
> > >
> > > Thanks,
> > > *Abhishek Bhattacharjee*
> > > *Pune Institute of Computer Technology*
> > >
> >
> >
> >
> > --
> > Cheers
> >Rob.
> >
>
>
>
> --
> -- Guozhang
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*


Reg Exception in Kafka

2014-01-27 Thread Balasubramanian Jayaraman (Contingent)
Hi,

I have a remote server (EC2) setup with Kafka cluster setup. There are 3 
brokers each running in the port 9092,9093,9094. The zookeeper is running in 
the port 2181.
When I send message to the brokers from my PC, I get an exception which is 
given below. I did a dump in the remote server, the request is received in the 
remote server.
I am able to locally test the consumer/producer script present in the bin 
folder. What am I missing? Can you kindly help me in this error? Any help will 
be highly grateful.

[ INFO] [main 2014-01-27 16:06:50,083] Verifying properties
[ INFO] [main 2014-01-27 16:06:50,108] Property metadata.broker.list is 
overridden to 54.241.44.129:9092,54.241.44.129:9093,54.241.44.129:9094
[ INFO] [main 2014-01-27 16:06:50,108] Property request.required.acks is 
overridden to 1
[ INFO] [main 2014-01-27 16:06:50,108] Property key.serializer.class is 
overridden to kafka.serializer.StringEncoder
[ INFO] [main 2014-01-27 16:06:50,108] Property serializer.class is overridden 
to kafka.utils.EncryptEncoder
[ INFO] [main 2014-01-27 16:06:50,154] send: encrypted - Message_1
[DEBUG] [main 2014-01-27 16:06:50,298] Handling 1 events [ INFO] [main 
2014-01-27 15:59:43,540] Fetching metadata from broker 
id:0,host:54.241.44.129,port:9093 with correlation id 0 for 1 topic(s) 
Set(mytopic)
[DEBUG] [main 2014-01-27 15:59:43,737] Created socket with SO_TIMEOUT = 1 
(requested 1), SO_RCVBUF = 8192 (requested -1), SO_SNDBUF = 102400 
(requested 102400).
[ INFO] [main 2014-01-27 15:59:43,738] Connected to 54.241.44.129:9093 for 
producing
[ INFO] [main 2014-01-27 15:59:44,018] Disconnecting from 54.241.44.129:9093
[DEBUG] [main 2014-01-27 15:59:44,025] Successfully fetched metadata for 1 
topic(s) Set(mytopic)
[DEBUG] [main 2014-01-27 15:59:44,058] Getting broker partition info for topic 
mytopic
[DEBUG] [main 2014-01-27 15:59:44,060] Partition [mytopic,0] has leader 2
[DEBUG] [main 2014-01-27 15:59:44,072] Broker partitions registered for topic: 
mytopic are 0
[DEBUG] [main 2014-01-27 15:59:44,091] Sending 1 messages with no compression 
to [mytopic,0]
[DEBUG] [main 2014-01-27 15:59:44,109] Producer sending messages with 
correlation id 2 for topics [mytopic,0] to broker 2 on 
ip-10-199-31-87.us-west-1.compute.internal:9093
[ERROR] [main 2014-01-27 15:59:44,129] Producer connection to 
ip-10-199-31-87.us-west-1.compute.internal:9093 unsuccessful
java.nio.channels.UnresolvedAddressException
   at sun.nio.ch.Net.checkAddress(Net.java:127)
   at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
   at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
   at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
   at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
   at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
   at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
   at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
   at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
   at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
   at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:254)
   at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
   at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
   at scala.collection.Iterator$class.foreach(Iterator.scala:631)
   at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
   at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
   at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
   at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
   at kafka.producer.Producer.send(Producer.scala:76)
   at kafka.javaapi.producer.Producer.send(Producer.scala:33)
   at kafka.application.KafkaProducer.sendMessage(KafkaProdu

Event data modelling best practices?

2014-01-27 Thread Vjeran Marcinko
Hi,

This I guess is not just question for Kafka, but for all event driven
systems, but since most of people here deal with events, I would like to
hear some basic suggestions for modelling event messages, or even better,
pointer to some relevant literature/website that deals with this stuff?

Anyway, my quetions are something like this...

If I have event that is spawned when some request is processed, such as:

BankAccountService.credit(long bankAccountId, long amount);
, and event that is triggered then is (in some pseudo data structure):

BankAccountCredited {
long bankAccountId;
long amount;
}

1. If I leave just these pieces of data in this event, the consumer would
not be able to reconstruct the state of bank account (account's balance
being the piece of state that changed), if not having the same logic present
in event accumulator (which is especially very problematic when code
versioning is in place, which is practically alwys)?

2. Because of previous code/logic requirement to reconstruct state, I guess
it would be wise to include piece of account state that changed, such as
adding balance after tha credit request execution:
BankAccountCredited {
long bankAccountId;
long amount;
long balance;
}

3. Another option that maybe seems better when thinking that many different
events will want to report state of acount after action, then nested Bank
Account dana structure seems better, right?

BankAccountCredited {
long amount;
BankAccount {
long id;
long balance;
boolean active;
}
}
We can see that in this case there is also some fields rpesent (active) of
account entitiy that were not directly affected by credit action, but we
have them here because BankAccount dana structure contains all of fields,
that is OK, right?

4. What is some downstream consumers are interested in all events
("category") that change account's balance, meaning, maybe the consumer
doesn't care if event is BankAccountCredited or BankAccountDebited, because
he is interested in the category of evevnts that can be described as
"BankAccountBalanceChanged". Since there is no "supertyping" usually present
in popular serialization libs (Avro, Thrift...), how do you implement this -
do you subscribe consumer individually to all topics that contrain events
that change bank account balance, or you create one topic that contains all
of evevtns of that particular category? (the later aproach would not work
because categories doesn't have to be so straightforward, many events have
many-to-many relationship to various categories - in java it would simply be
implemented with using interfaces to mark categories, but here we don't have
that option)

5. What if some action mutate several entities when being processed? Do you
spawn 2 events from application layer, or you publish just one which
subsequently, by some real-time rpocessor, triggers spawning 2 various ones
- each for different  entitiy that was affected?

I could probably think of some other questions, but you get the point what
I'm interested in..

Best regards,
Vjeran




Kafka @ apachecon Denver?

2014-01-27 Thread David Corley
Kafka Devs,
Just wondering if there'll be anything in the line of Kafka presentations
and/or tutorials at ApacheCon in Denver in April?


Re: Kafka @ apachecon Denver?

2014-01-27 Thread Joe Stein
I was going to submit a talk on Kafka and Mesos.  I still am trying to nail 
down the dates in my schedule though.  Anyone else going? Maybe we could do a 
meetup or bof or something?


/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
/


On Jan 27, 2014, at 9:54 AM, David Corley  wrote:

> Kafka Devs,
> Just wondering if there'll be anything in the line of Kafka presentations
> and/or tutorials at ApacheCon in Denver in April?


Re: Kafka @ apachecon Denver?

2014-01-27 Thread Steve Morin
Are you using mesos?

> On Jan 27, 2014, at 8:39, Joe Stein  wrote:
> 
> I was going to submit a talk on Kafka and Mesos.  I still am trying to nail 
> down the dates in my schedule though.  Anyone else going? Maybe we could do a 
> meetup or bof or something?
> 
> 
> /***
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop
> /
> 
> 
>> On Jan 27, 2014, at 9:54 AM, David Corley  wrote:
>> 
>> Kafka Devs,
>> Just wondering if there'll be anything in the line of Kafka presentations
>> and/or tutorials at ApacheCon in Denver in April?


hia,all,I have a problem ,need help

2014-01-27 Thread
 problem  like this;


[2014-01-27 14:03:03,962] ERROR Closing socket for /192.168.86.71 because of 
error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
at kafka.utils.Utils$.read(Utils.scala:395)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:662)


and  i just use the example   :here 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Looking forward to your help



Re: Reg Exception in Kafka

2014-01-27 Thread Jun Rao
Have you looked at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2,whycan'tmyhigh-levelconsumersconnecttothebrokers
?

thanks,

Jun


On Mon, Jan 27, 2014 at 12:17 AM, Balasubramanian Jayaraman (Contingent) <
balasubramanian.jayara...@autodesk.com> wrote:

> Hi,
>
> I have a remote server (EC2) setup with Kafka cluster setup. There are 3
> brokers each running in the port 9092,9093,9094. The zookeeper is running
> in the port 2181.
> When I send message to the brokers from my PC, I get an exception which is
> given below. I did a dump in the remote server, the request is received in
> the remote server.
> I am able to locally test the consumer/producer script present in the bin
> folder. What am I missing? Can you kindly help me in this error? Any help
> will be highly grateful.
>
> [ INFO] [main 2014-01-27 16:06:50,083] Verifying properties
> [ INFO] [main 2014-01-27 16:06:50,108] Property metadata.broker.list is
> overridden to 54.241.44.129:9092,54.241.44.129:9093,54.241.44.129:9094
> [ INFO] [main 2014-01-27 16:06:50,108] Property request.required.acks is
> overridden to 1
> [ INFO] [main 2014-01-27 16:06:50,108] Property key.serializer.class is
> overridden to kafka.serializer.StringEncoder
> [ INFO] [main 2014-01-27 16:06:50,108] Property serializer.class is
> overridden to kafka.utils.EncryptEncoder
> [ INFO] [main 2014-01-27 16:06:50,154] send: encrypted - Message_1
> [DEBUG] [main 2014-01-27 16:06:50,298] Handling 1 events [ INFO] [main
> 2014-01-27 15:59:43,540] Fetching metadata from broker
> id:0,host:54.241.44.129,port:9093 with correlation id 0 for 1 topic(s)
> Set(mytopic)
> [DEBUG] [main 2014-01-27 15:59:43,737] Created socket with SO_TIMEOUT =
> 1 (requested 1), SO_RCVBUF = 8192 (requested -1), SO_SNDBUF =
> 102400 (requested 102400).
> [ INFO] [main 2014-01-27 15:59:43,738] Connected to 54.241.44.129:9093for 
> producing
> [ INFO] [main 2014-01-27 15:59:44,018] Disconnecting from
> 54.241.44.129:9093
> [DEBUG] [main 2014-01-27 15:59:44,025] Successfully fetched metadata for 1
> topic(s) Set(mytopic)
> [DEBUG] [main 2014-01-27 15:59:44,058] Getting broker partition info for
> topic mytopic
> [DEBUG] [main 2014-01-27 15:59:44,060] Partition [mytopic,0] has leader 2
> [DEBUG] [main 2014-01-27 15:59:44,072] Broker partitions registered for
> topic: mytopic are 0
> [DEBUG] [main 2014-01-27 15:59:44,091] Sending 1 messages with no
> compression to [mytopic,0]
> [DEBUG] [main 2014-01-27 15:59:44,109] Producer sending messages with
> correlation id 2 for topics [mytopic,0] to broker 2 on
> ip-10-199-31-87.us-west-1.compute.internal:9093
> [ERROR] [main 2014-01-27 15:59:44,129] Producer connection to
> ip-10-199-31-87.us-west-1.compute.internal:9093 unsuccessful
> java.nio.channels.UnresolvedAddressException
>at sun.nio.ch.Net.checkAddress(Net.java:127)
>at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:640)
>at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
>at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:254)
>at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
>at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
>at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>at scala.collection.mutable.HashMap.

Re: Kafka @ apachecon Denver?

2014-01-27 Thread Joe Stein
Yes


/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
/


On Jan 27, 2014, at 11:57 AM, Steve Morin  wrote:

> Are you using mesos?
> 
>> On Jan 27, 2014, at 8:39, Joe Stein  wrote:
>> 
>> I was going to submit a talk on Kafka and Mesos.  I still am trying to nail 
>> down the dates in my schedule though.  Anyone else going? Maybe we could do 
>> a meetup or bof or something?
>> 
>> 
>> /***
>> Joe Stein
>> Founder, Principal Consultant
>> Big Data Open Source Security LLC
>> http://www.stealth.ly
>> Twitter: @allthingshadoop
>> /
>> 
>> 
>>> On Jan 27, 2014, at 9:54 AM, David Corley  wrote:
>>> 
>>> Kafka Devs,
>>> Just wondering if there'll be anything in the line of Kafka presentations
>>> and/or tutorials at ApacheCon in Denver in April?


Re: hia,all,I have a problem ,need help

2014-01-27 Thread Jun Rao
This could be normal. It just means that the client closed the socket,
potentially during consumer rebalancing.

Thanks,

Jun


On Sun, Jan 26, 2014 at 10:33 PM, 我  wrote:

>  problem  like this;
>
>
> [2014-01-27 14:03:03,962] ERROR Closing socket for /192.168.86.71 because
> of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at kafka.utils.Utils$.read(Utils.scala:395)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:347)
> at kafka.network.Processor.run(SocketServer.scala:245)
> at java.lang.Thread.run(Thread.java:662)
>
>
> and  i just use the example   :here
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> Looking forward to your help
>
>