Re: Unexpected broker election

2014-02-21 Thread Jun Rao
So, it sounds like you want the leader to be moved back to the failed
broker that has caught up. For now, you can use this tool (
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-2.PreferredReplicaLeaderElectionTool).
In 0.8.1 release, we have an option to balance the leaders automatically
every configurable period of time.

Thanks,

Jun


On Fri, Feb 21, 2014 at 10:22 AM, Andrew Otto  wrote:

> Hi all,
>
> This has happened a couple of times to me now in the past month, and I'm
> not entirely sure of the cause, although I have a suspicion.
>
> Early this morning (UTC), it looks like one of my two brokers (id 21) lost
> its connection to Zookeeper for a very short period of time.  This caused
> the second broker (id 22) to quickly become the leader for all partitions.
>  Once broker 21 was able to re-establish its Zookeeper connection, it
> noticed that it has a stale list for the ISR, got its updated list, and
> started replicating from broker 22 for all partitions.  Broker 21 then
> quickly rejoined the ISR, but annoyingly (but expectedly), broker 22
> remained the leader.  All of this happened in under a minute.
>
> I'm wondering if https://issues.apache.org/jira/browse/KAFKA-766 is
> related.  The current batch size on our producers is 6000 msgs or 1000 ms
> (I've been meaning to reduce this).  We do about 6000 msgs per second / per
> producer, and have 10 partitions in this relevant topic.  A couple of days
> ago, we noticed flapping ISR Shrink/Expand logs, so I upped
> replica.lag.max.messages to 1, so that it would surely be above our
> batch size.  I still occasionally see flapping ISR Shrinks/Expands, but
> hope that when I reduce the producer batch size, I will stop seeing these.
>
> Anyway, I'm not entirely sure what happened here.  Could flapping ISRs
> potentially cause this?
>
> For reference, the relevant logs from my brokers and a zookeeper are here:
> https://gist.github.com/ottomata/9139443
>
> Thanks!
> -Andrew Otto
>
>
>


Re: New Consumer API discussion

2014-02-21 Thread Jay Kreps
Yes but the problem is that poll() actually has side effects if you are
using auto commit. So you have to do an awkward thing were you track the
last offset you've seen and somehow keep this up to date as the partitions
you own changes. Likewise if you want this value prior to reading any
messages that won't work.

-Jay


On Fri, Feb 21, 2014 at 4:56 PM, Jun Rao  wrote:

> What's the use case of position()? Isn't that just the nextOffset() on the
> last message returned from poll()?
>
> Thanks,
>
> Jun
>
>
> On Sun, Feb 16, 2014 at 9:12 AM, Jay Kreps  wrote:
>
> > +1 I think those are good. It is a little weird that changing the fetch
> > point is not batched but changing the commit point is, but I suppose
> there
> > is no helping that.
> >
> > -Jay
> >
> >
> > On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede  > >wrote:
> >
> > > Jay,
> > >
> > > That makes sense. position/seek deal with changing the consumers
> > in-memory
> > > data, so there is no remote rpc there. For some reason, I got committed
> > and
> > > seek mixed up in my head at that time :)
> > >
> > > So we still end up with
> > >
> > >long position(TopicPartition tp)
> > >void seek(TopicPartitionOffset p)
> > >Map committed(TopicPartition tp);
> > >void commit(TopicPartitionOffset...);
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Friday, February 14, 2014, Jay Kreps  wrote:
> > >
> > > > Oh, interesting. So I am assuming the following implementation:
> > > > 1. We have an in-memory fetch position which controls the next fetch
> > > > offset.
> > > > 2. Changing this has no effect until you poll again at which point
> your
> > > > fetch request will be from the newly specified offset
> > > > 3. We then have an in-memory but also remotely stored committed
> offset.
> > > > 4. Calling commit has the effect of saving the fetch position as both
> > the
> > > > in memory committed position and in the remote store
> > > > 5. Auto-commit is the same as periodically calling commit on all
> > > positions.
> > > >
> > > > So batching on commit as well as getting the committed position makes
> > > > sense, but batching the fetch position wouldn't, right? I think you
> are
> > > > actually thinking of a different approach.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > 
> > > > >wrote:
> > > >
> > > > > I think you are saying both, i.e. if you
> > > > > have committed on a partition it returns you that value but if you
> > > > haven't
> > > > > it does a remote lookup?
> > > > >
> > > > > Correct.
> > > > >
> > > > > The other argument for making committed batched is that commit() is
> > > > > batched, so there is symmetry.
> > > > >
> > > > > position() and seek() are always in memory changes (I assume) so
> > there
> > > is
> > > > > no need to batch them.
> > > > >
> > > > > I'm not as sure as you are about that assumption being true.
> > Basically
> > > in
> > > > > my example above, the batching argument for committed() also
> applies
> > to
> > > > > position() since one purpose of fetching a partition's offset is to
> > use
> > > > it
> > > > > to set the position of the consumer to that offset. Since that
> might
> > > lead
> > > > > to a remote OffsetRequest call, I think we probably would be better
> > off
> > > > > batching it.
> > > > >
> > > > > Another option for naming would be position/reposition instead
> > > > > of position/seek.
> > > > >
> > > > > I think position/seek is better since it aligns with Java file
> APIs.
> > > > >
> > > > > I also think your suggestion about ConsumerPosition makes sense.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > > On Feb 13, 2014 9:22 PM, "Jay Kreps"  wrote:
> > > > >
> > > > > > Hey Neha,
> > > > > >
> > > > > > I actually wasn't proposing the name TopicOffsetPosition, that
> was
> > > > just a
> > > > > > typo. I meant TopicPartitionOffset, and I was just referencing
> what
> > > was
> > > > > in
> > > > > > the javadoc. So to restate my proposal without the typo, using
> just
> > > the
> > > > > > existing classes (that naming is a separate question):
> > > > > >long position(TopicPartition tp)
> > > > > >void seek(TopicPartitionOffset p)
> > > > > >long committed(TopicPartition tp)
> > > > > >void commit(TopicPartitionOffset...);
> > > > > >
> > > > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is
> it
> > > > > > returning the in-memory value from the last commit by this
> > consumer,
> > > or
> > > > > is
> > > > > > it doing a remote fetch, or both? I think you are saying both,
> i.e.
> > > if
> > > > > you
> > > > > > have committed on a partition it returns you that value but if
> you
> > > > > haven't
> > > > > > it does a remote lookup?
> > > > > >
> > > > > > The other argument for making committed batched is that commit()
> is
> > > > > > batched, so there is symmetry.
> > > > > >
> > > > > > position() and seek() are always in memory changes (I assum

Re: New Consumer API discussion

2014-02-21 Thread Jun Rao
Robert,

Could you explain why you want to distinguish btw FetchingInProgressException
and NoMessagePendingException? The nextMsgs() method that you want is
exactly what poll() does.

Thanks,

Jun


On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert wrote:

> I am not clear on why the consumer stream should be positionable,
> especially if it is limited to the in-memory fetched messages.  Could
> someone explain to me, please?  I really like the idea of committing the
> offset specifically on those partitions with changed read offsets, only.
>
>
>
> 2 items I would like to see added to the KafkaStream are:
>
> * a non-blocking next(), throws several exceptions
> (FetchingInProgressException and a NoMessagePendingException or something)
> to differentiate between fetching or no messages left.
>
> * A nextMsgs() method which returns all locally available messages
> and kicks off a fetch for the next chunk.
>
>
>
> If you are trying to add transactional features, then formally define a
> DTP capability and pull in other server frameworks to share the
> implementation.  Should it be XA/Open?  How about a new peer2peer DTP
> protocol?
>
>
>
> Thank you,
>
> Robert
>
>
>
> Robert Withers
>
> Staff Analyst/Developer
>
> o: (720) 514-8963
>
> c:  (571) 262-1873
>
>
>
> -Original Message-
> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> Sent: Sunday, February 16, 2014 10:13 AM
> To: users@kafka.apache.org
> Subject: Re: New Consumer API discussion
>
>
>
> +1 I think those are good. It is a little weird that changing the fetch
>
> point is not batched but changing the commit point is, but I suppose there
> is no helping that.
>
>
>
> -Jay
>
>
>
>
>
> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede  >wrote:
>
>
>
> > Jay,
>
> >
>
> > That makes sense. position/seek deal with changing the consumers
>
> > in-memory data, so there is no remote rpc there. For some reason, I
>
> > got committed and seek mixed up in my head at that time :)
>
> >
>
> > So we still end up with
>
> >
>
> >long position(TopicPartition tp)
>
> >void seek(TopicPartitionOffset p)
>
> >Map committed(TopicPartition tp);
>
> >void commit(TopicPartitionOffset...);
>
> >
>
> > Thanks,
>
> > Neha
>
> >
>
> > On Friday, February 14, 2014, Jay Kreps  jay.kr...@gmail.com>> wrote:
>
> >
>
> > > Oh, interesting. So I am assuming the following implementation:
>
> > > 1. We have an in-memory fetch position which controls the next fetch
>
> > > offset.
>
> > > 2. Changing this has no effect until you poll again at which point
>
> > > your fetch request will be from the newly specified offset 3. We
>
> > > then have an in-memory but also remotely stored committed offset.
>
> > > 4. Calling commit has the effect of saving the fetch position as
>
> > > both the in memory committed position and in the remote store 5.
>
> > > Auto-commit is the same as periodically calling commit on all
>
> > positions.
>
> > >
>
> > > So batching on commit as well as getting the committed position
>
> > > makes sense, but batching the fetch position wouldn't, right? I
>
> > > think you are actually thinking of a different approach.
>
> > >
>
> > > -Jay
>
> > >
>
> > >
>
> > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
>
> > > 
> > 
>
> > > >wrote:
>
> > >
>
> > > > I think you are saying both, i.e. if you have committed on a
>
> > > > partition it returns you that value but if you
>
> > > haven't
>
> > > > it does a remote lookup?
>
> > > >
>
> > > > Correct.
>
> > > >
>
> > > > The other argument for making committed batched is that commit()
>
> > > > is batched, so there is symmetry.
>
> > > >
>
> > > > position() and seek() are always in memory changes (I assume) so
>
> > > > there
>
> > is
>
> > > > no need to batch them.
>
> > > >
>
> > > > I'm not as sure as you are about that assumption being true.
>
> > > > Basically
>
> > in
>
> > > > my example above, the batching argument for committed() also
>
> > > > applies to
>
> > > > position() since one purpose of fetching a partition's offset is
>
> > > > to use
>
> > > it
>
> > > > to set the position of the consumer to that offset. Since that
>
> > > > might
>
> > lead
>
> > > > to a remote OffsetRequest call, I think we probably would be
>
> > > > better off batching it.
>
> > > >
>
> > > > Another option for naming would be position/reposition instead of
>
> > > > position/seek.
>
> > > >
>
> > > > I think position/seek is better since it aligns with Java file APIs.
>
> > > >
>
> > > > I also think your suggestion about ConsumerPosition makes sense.
>
> > > >
>
> > > > Thanks,
>
> > > > Neha
>
> > > > On Feb 13, 2014 9:22 PM, "Jay Kreps"  jay.kr...@gmail.com>> wrote:
>
> > > >
>
> > > > > Hey Neha,
>
> > > > >
>
> > > > > I actually wasn't proposing the name TopicOffsetPosition, that
>
> > > > > was
>
> > > just a
>
> > > > > typo. I meant TopicPartitionOffset, and I was just referencing
>
> > > > > what
>
> > was
>
> > > > in
>
> > > > > the javadoc. 

Re: New Consumer API discussion

2014-02-21 Thread Jun Rao
What's the use case of position()? Isn't that just the nextOffset() on the
last message returned from poll()?

Thanks,

Jun


On Sun, Feb 16, 2014 at 9:12 AM, Jay Kreps  wrote:

> +1 I think those are good. It is a little weird that changing the fetch
> point is not batched but changing the commit point is, but I suppose there
> is no helping that.
>
> -Jay
>
>
> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede  >wrote:
>
> > Jay,
> >
> > That makes sense. position/seek deal with changing the consumers
> in-memory
> > data, so there is no remote rpc there. For some reason, I got committed
> and
> > seek mixed up in my head at that time :)
> >
> > So we still end up with
> >
> >long position(TopicPartition tp)
> >void seek(TopicPartitionOffset p)
> >Map committed(TopicPartition tp);
> >void commit(TopicPartitionOffset...);
> >
> > Thanks,
> > Neha
> >
> > On Friday, February 14, 2014, Jay Kreps  wrote:
> >
> > > Oh, interesting. So I am assuming the following implementation:
> > > 1. We have an in-memory fetch position which controls the next fetch
> > > offset.
> > > 2. Changing this has no effect until you poll again at which point your
> > > fetch request will be from the newly specified offset
> > > 3. We then have an in-memory but also remotely stored committed offset.
> > > 4. Calling commit has the effect of saving the fetch position as both
> the
> > > in memory committed position and in the remote store
> > > 5. Auto-commit is the same as periodically calling commit on all
> > positions.
> > >
> > > So batching on commit as well as getting the committed position makes
> > > sense, but batching the fetch position wouldn't, right? I think you are
> > > actually thinking of a different approach.
> > >
> > > -Jay
> > >
> > >
> > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede <
> neha.narkh...@gmail.com
> > 
> > > >wrote:
> > >
> > > > I think you are saying both, i.e. if you
> > > > have committed on a partition it returns you that value but if you
> > > haven't
> > > > it does a remote lookup?
> > > >
> > > > Correct.
> > > >
> > > > The other argument for making committed batched is that commit() is
> > > > batched, so there is symmetry.
> > > >
> > > > position() and seek() are always in memory changes (I assume) so
> there
> > is
> > > > no need to batch them.
> > > >
> > > > I'm not as sure as you are about that assumption being true.
> Basically
> > in
> > > > my example above, the batching argument for committed() also applies
> to
> > > > position() since one purpose of fetching a partition's offset is to
> use
> > > it
> > > > to set the position of the consumer to that offset. Since that might
> > lead
> > > > to a remote OffsetRequest call, I think we probably would be better
> off
> > > > batching it.
> > > >
> > > > Another option for naming would be position/reposition instead
> > > > of position/seek.
> > > >
> > > > I think position/seek is better since it aligns with Java file APIs.
> > > >
> > > > I also think your suggestion about ConsumerPosition makes sense.
> > > >
> > > > Thanks,
> > > > Neha
> > > > On Feb 13, 2014 9:22 PM, "Jay Kreps"  wrote:
> > > >
> > > > > Hey Neha,
> > > > >
> > > > > I actually wasn't proposing the name TopicOffsetPosition, that was
> > > just a
> > > > > typo. I meant TopicPartitionOffset, and I was just referencing what
> > was
> > > > in
> > > > > the javadoc. So to restate my proposal without the typo, using just
> > the
> > > > > existing classes (that naming is a separate question):
> > > > >long position(TopicPartition tp)
> > > > >void seek(TopicPartitionOffset p)
> > > > >long committed(TopicPartition tp)
> > > > >void commit(TopicPartitionOffset...);
> > > > >
> > > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is it
> > > > > returning the in-memory value from the last commit by this
> consumer,
> > or
> > > > is
> > > > > it doing a remote fetch, or both? I think you are saying both, i.e.
> > if
> > > > you
> > > > > have committed on a partition it returns you that value but if you
> > > > haven't
> > > > > it does a remote lookup?
> > > > >
> > > > > The other argument for making committed batched is that commit() is
> > > > > batched, so there is symmetry.
> > > > >
> > > > > position() and seek() are always in memory changes (I assume) so
> > there
> > > is
> > > > > no need to batch them.
> > > > >
> > > > > So taking all that into account what if we revise it to
> > > > >long position(TopicPartition tp)
> > > > >void seek(TopicPartitionOffset p)
> > > > >Map committed(TopicPartition tp);
> > > > >void commit(TopicPartitionOffset...);
> > > > >
> > > > > This is not symmetric between position/seek and commit/committed
> but
> > it
> > > > is
> > > > > convenient. Another option for naming would be position/reposition
> > > > instead
> > > > > of position/seek.
> > > > >
> > > > > With respect to the name TopicPartitionOffset, what I was trying to
> > say
> > > > is
> > >

Re: broker offline

2014-02-21 Thread Guozhang Wang
Yes, it will be queued, and the second rebalance will start right after the
first one.


On Fri, Feb 21, 2014 at 8:50 AM, Yu, Libo  wrote:

> In our case, two brokers were offline. When the first broker was offline,
> that would trigger a rebalance. When the second broker was offline, if the
> consumers were in the process of rebalance, what do we expect? Is the
> second rebalance request queued?
>
> Regards,
>
> Libo
>
>
> -Original Message-
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: Friday, February 14, 2014 7:33 PM
> To: users@kafka.apache.org
> Subject: Re: broker offline
>
> Hello Libo,
>
> When ZK resumes from a soft failure, like a GC, it will mark the ephemeral
> nodes as session timed out, and the brokers will try to re-register upon
> receiving the session timeout. You can re-produce this issue by signal
> pause the ZK process.
>
> Guozhang
>
>
> On Fri, Feb 14, 2014 at 12:15 PM, Yu, Libo  wrote:
>
> > Hi team,
> >
> > We have three brokers on our production cluster. I noticed two of them
> > somehow got offline and then re-registered with zookeeper and got back
> > online. It seems the issue was caused by some zookeeper issue. So I
> > want to know what may be the possible cases of the issue. If I want to
> > reproduce the issue, is there any way to do it? Thanks.
> >
> > Regards,
> >
> > Libo
> >
> >
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Unexpected broker election

2014-02-21 Thread Andrew Otto
Hi all,

This has happened a couple of times to me now in the past month, and I’m not 
entirely sure of the cause, although I have a suspicion.

Early this morning (UTC), it looks like one of my two brokers (id 21) lost its 
connection to Zookeeper for a very short period of time.  This caused the 
second broker (id 22) to quickly become the leader for all partitions.  Once 
broker 21 was able to re-establish its Zookeeper connection, it noticed that it 
has a stale list for the ISR, got its updated list, and started replicating 
from broker 22 for all partitions.  Broker 21 then quickly rejoined the ISR, 
but annoyingly (but expectedly), broker 22 remained the leader.  All of this 
happened in under a minute.

I’m wondering if https://issues.apache.org/jira/browse/KAFKA-766 is related.  
The current batch size on our producers is 6000 msgs or 1000 ms (I’ve been 
meaning to reduce this).  We do about 6000 msgs per second / per producer, and 
have 10 partitions in this relevant topic.  A couple of days ago, we noticed 
flapping ISR Shrink/Expand logs, so I upped replica.lag.max.messages to 1, 
so that it would surely be above our batch size.  I still occasionally see 
flapping ISR Shrinks/Expands, but hope that when I reduce the producer batch 
size, I will stop seeing these.

Anyway, I’m not entirely sure what happened here.  Could flapping ISRs 
potentially cause this?

For reference, the relevant logs from my brokers and a zookeeper are here: 
https://gist.github.com/ottomata/9139443

Thanks!
-Andrew Otto




RE: broker offline

2014-02-21 Thread Yu, Libo
In our case, two brokers were offline. When the first broker was offline,
that would trigger a rebalance. When the second broker was offline, if the 
consumers were in the process of rebalance, what do we expect? Is the 
second rebalance request queued? 

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, February 14, 2014 7:33 PM
To: users@kafka.apache.org
Subject: Re: broker offline

Hello Libo,

When ZK resumes from a soft failure, like a GC, it will mark the ephemeral 
nodes as session timed out, and the brokers will try to re-register upon 
receiving the session timeout. You can re-produce this issue by signal pause 
the ZK process.

Guozhang


On Fri, Feb 14, 2014 at 12:15 PM, Yu, Libo  wrote:

> Hi team,
>
> We have three brokers on our production cluster. I noticed two of them 
> somehow got offline and then re-registered with zookeeper and got back 
> online. It seems the issue was caused by some zookeeper issue. So I 
> want to know what may be the possible cases of the issue. If I want to 
> reproduce the issue, is there any way to do it? Thanks.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


Re: producer not pushing the data into broker

2014-02-21 Thread Jun Rao
Could you do a list topic and show the output? Also, any error in the
controller and state-change log?

Thanks,

Jun


On Fri, Feb 21, 2014 at 2:33 AM, Arjun  wrote:

> hi,
>
> I am testing kafka 0.8 on my local machine. i have only one zookeeper and
> one kafka broker running.
>
> when i run the console producer i get this error:
>
> [2014-02-21 16:01:20,512] WARN Error while fetching metadata
> [{TopicMetadata for topic test ->
> No partition metadata for topic test due to 
> kafka.common.LeaderNotAvailableException}]
> for topic [test]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-02-21 16:01:20,513] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler)
> [2014-02-21 16:01:20,632] WARN Error while fetching metadata
> [{TopicMetadata for topic test ->
> No partition metadata for topic test due to 
> kafka.common.LeaderNotAvailableException}]
> for topic [test]: class kafka.common.LeaderNotAvailableException
> (kafka.producer.BrokerPartitionInfo)
> [2014-02-21 16:01:20,636] ERROR Failed to send requests for topics test
> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> ERROR Error in handling batch of 1 events (kafka.producer.async.
> ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after
> 3 tries.
> at kafka.producer.async.DefaultEventHandler.handle(
> DefaultEventHandler.scala:90)
> at kafka.producer.async.ProducerSendThread.tryToHandle(
> ProducerSendThread.scala:104)
> at kafka.producer.async.ProducerSendThread$$anonfun$
> processEvents$3.apply(ProducerSendThread.scala:87)
> at kafka.producer.async.ProducerSendThread$$anonfun$
> processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at kafka.producer.async.ProducerSendThread.processEvents(
> ProducerSendThread.scala:66)
> at kafka.producer.async.ProducerSendThread.run(
> ProducerSendThread.scala:44)
>
> Can some one please let me know why this is happening.
> I checked my vhost files and localhost, my hostname are correct.
>


Re: Reg Exception in Kafka

2014-02-21 Thread Jun Rao
Maybe you need to add the port to the right security group?

Thanks,

Jun


On Thu, Feb 20, 2014 at 9:58 PM, Balasubramanian Jayaraman (Contingent) <
balasubramanian.jayara...@autodesk.com> wrote:

> One point to note is that, I am trying to access the Kafka broker (located
> in Amazon Cloud, EC2 instance) from the Eclipse (located in my office). I
> am using Kafka from the trunk .
>
> Thanks
> Bala
>
> -Original Message-
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Friday, 21 February, 2014 1:51 PM
> To: users@kafka.apache.org
> Subject: Re: Reg Exception in Kafka
>
> That explains why your producer hits connection timeout. Not sure why the
> controller to broker connection also times out though, if you can manually
> establish the connection.
>
> Thanks,
>
> Jun
>
>
> On Thu, Feb 20, 2014 at 7:37 PM, Balasubramanian Jayaraman (Contingent) <
> balasubramanian.jayara...@autodesk.com> wrote:
>
> > All the brokers reside in the same server and are listening on the
> > port 10092,10093,10094. From the same machine I can connect to the
> > zookeeper and the brokers. But When I tried to connect from an
> > external machine (from Eclipse), I get an exception as communicated
> > earlier. I was not able to connect to any of the brokers. I get the
> > same exception while connecting to all the brokers.
> >
> > Regards
> > Bala
> >
> > -Original Message-
> > From: Jun Rao [mailto:jun...@gmail.com]
> > Sent: Thursday, 20 February, 2014 12:05 AM
> > To: users@kafka.apache.org
> > Subject: Re: Reg Exception in Kafka
> >
> > Can you make the connection from the controller host to the other broker?
> > Also, what's the # open file handlers on each broker? Do you see any
> > "too many open file handler" error in the broker log?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Feb 19, 2014 at 12:14 AM, Balasubramanian Jayaraman
> > (Contingent) < balasubramanian.jayara...@autodesk.com> wrote:
> >
> > > I don't think so. I am able to connect to the server using a putty.
> > > It is VM running in Amazon cloud.
> > >
> > > Thanks
> > > Bala
> > >
> > > -Original Message-
> > > From: Jun Rao [mailto:jun...@gmail.com]
> > > Sent: Wednesday, 19 February, 2014 12:32 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: Reg Exception in Kafka
> > >
> > > Any issue with the network?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Feb 18, 2014 at 7:48 PM, Balasubramanian Jayaraman
> > > (Contingent) < balasubramanian.jayara...@autodesk.com> wrote:
> > >
> > > > I just came from vacation.
> > > > When I tested it, I get a Connect Exception. The complete stack
> > > > trace is given below.
> > > >
> > > > Exception Stack Trace
> > > >
> > > > Java Thin Client:
> > > >
> > > > ava.net.ConnectException: Connection timed out: connect
> > > > at sun.nio.ch.Net.connect0(Native Method)
> > > > at sun.nio.ch.Net.connect(Net.java:465)
> > > > at sun.nio.ch.Net.connect(Net.java:457)
> > > > at
> > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:666)
> > > > at
> > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at
> kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > > at
> > > >
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > > at
> > > >
> > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Sync
> > > Pr
> > > oducer.scala:68)
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > > at
> > > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > > at
> > > >
> > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.sc
> > > al
> > > a:82)
> > > > at
> > > >
> > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV
> > > $s
> > > p(DefaultEventHandler.scala:67)
> > > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > at
> > > >
> > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.
> > > sc
> > > ala:67)
> > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > at
> > > >
> > > com.autodesk.kafka.test.utils.KafkaProducer.sendMessage(KafkaProducer.
> > > java:48)
> > > > at
> > > >
> > > com.autodesk.kafka.test.integration.KafkaProducerTest.testProducer(K
> > > af
> > > kaProducerTest.java:33)
> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> > > > at
> > > >
> > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl
> > > .j
> > > ava:57)
> > > > at
> > > >
> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce
> > > ss
> > > orImpl.java:43)
> > > > at java.

producer not pushing the data into broker

2014-02-21 Thread Arjun

hi,

I am testing kafka 0.8 on my local machine. i have only one zookeeper 
and one kafka broker running.


when i run the console producer i get this error:

[2014-02-21 16:01:20,512] WARN Error while fetching metadata 
[{TopicMetadata for topic test ->
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException 
(kafka.producer.BrokerPartitionInfo)
[2014-02-21 16:01:20,513] ERROR Failed to collate messages by topic, 
partition due to: Failed to fetch topic metadata for topic: test 
(kafka.producer.async.DefaultEventHandler)
[2014-02-21 16:01:20,632] WARN Error while fetching metadata 
[{TopicMetadata for topic test ->
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException 
(kafka.producer.BrokerPartitionInfo)
[2014-02-21 16:01:20,636] ERROR Failed to send requests for topics test 
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 
3 tries.
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)

at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)


Can some one please let me know why this is happening.
I checked my vhost files and localhost, my hostname are correct.