Re: can the replication factor for a topic be changed after it's created?

2013-05-08 Thread Jason Rosenberg
so does that, thereby, allow you to increase the replication for a
partition?


On Wed, May 8, 2013 at 10:48 PM, Neha Narkhede wrote:

> If you add more brokers to an existing cluster, you can use the partition
> reassignment admin tool to move a replica of selected partitions over to
> the new broker.
>
> Thanks,
> Neha
>
>
> On Wed, May 8, 2013 at 10:27 PM, Jason Rosenberg  wrote:
>
> > It looks like by default, the first time a new message arrives for a
> given
> > topic, it will receive the default replication factor in place on the
> > broker at the time it is first received.
> >
> > Is it possible to change this later (e.g. say if we add more hardware to
> > the cluster at a later date, etc.)?
> >
> > Thanks,
> >
> > Jason
> >
>


Re: Can't connect to a server if not enough partitions

2013-05-08 Thread Jason Rosenberg
Neha,

Thanks, I think I did understand what was going (despite the error
message).  And my question stands, if a broker is momentarily down,
shouldn't we still be able to create a topic?  If we send a message to a
topic it will succeed, even if not all replicas are available.  Why should
the initial message be any different?

Jason


On Wed, May 8, 2013 at 10:57 PM, Neha Narkhede wrote:

> I think this error message is somewhat misleading since we create topic on
> the first metadata request. It is complaining that a topic with the
> required replication factor cannot be created if there aren't enough
> brokers to satisfy the replication factor. This is expected behavior
> whether you use auto creation of topics or manual creation. However, the
> metadata requests will always give you correct information about existing
> topics.
>
> Thanks,
> Neha
>
>
> On Wed, May 8, 2013 at 10:15 PM, Jason Rosenberg  wrote:
>
> > With 0.8.0, I'm seeing that an initial metadata request fails, if the
> > number of running brokers is fewer than the configured replication
> factor:
> >
> > 877 [kafka-request-handler-0] ERROR kafka.server.KafkaApis  -
> > [KafkaApi-1946108683] Error while retrieving topic metadata
> > kafka.admin.AdministrationException: replication factor: 2 larger than
> > available brokers: 1
> > at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:62)
> > at
> kafka.admin.CreateTopicCommand$.createTopic(CreateTopicCommand.scala:92)
> > at
> >
> >
> kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:409)
> > at
> >
> >
> kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:401)
> > at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> > at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:400)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:61)
> > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
> > at java.lang.Thread.run(Thread.java:680)
> >
> > However, if after connecting, the number of brokers goes down, producing
> > clients have no problems continuing sending messages, etc.
> >
> > So, I thought the idea was that once a replica becomes available, it will
> > be caught up with messages it might have missed, etc.  This is good
> because
> > it makes doing things like rolling restarts of the brokers possible, etc.
> >  But it's a problem if a rolling restart happens at the same time a new
> > client is coming online to try and initialize a connection.
> >
> > Thoughts?
> >
> > Shouldn't the requirements be the same for initial connections as ongoing
> > connections?
> >
> > Jason
> >
>


[0.8] exception running on windows when attempting to send to an uncreated topic

2013-05-08 Thread Rob Withers
I am running on windows.  I am programmatically (no scripts) starting a zk,
2 brokers, 2 consumers and a producer, in this order but the first 3 at
once, then the other 3 at once, all with a nonexistent topic.

 

Here's the pertinent log for the producer (with other stuff mixed in, no
doubt):

 

start production: com.dish.des.msgs.notifications.StartProduction@d27b1b3

{2013-05-08 23:53:07,187}  INFO [local-vat] (Logging.scala:67) - Fetching
metadata with correlation id 0 for 1 topic(s) Set(robert_v_2x0)

{2013-05-08 23:53:07,206} DEBUG [local-vat] (Logging.scala:51) - Created
socket with SO_TIMEOUT = 1500 (requested 1500), SO_RCVBUF = 8192 (requested
-1), SO_SNDBUF = 102400 (requested 102400).

{2013-05-08 23:53:07,207} DEBUG [kafka-acceptor] (Logging.scala:51) -
Accepted connection from /127.0.0.1 on /127.0.0.1:9092. sendBufferSize
[actual|requested]: [1048576|1048576] recvBufferSize [actual|requested]:
[1048576|1048576]

{2013-05-08 23:53:07,208} DEBUG [kafka-processor-9092-0] (Logging.scala:51)
- Processor 0 listening to new connection from /127.0.0.1:63245

{2013-05-08 23:53:07,209}  INFO [local-vat] (Logging.scala:67) - Connected
to localhost:9092 for producing

{2013-05-08 23:53:07,256} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:78) - Processing request::
sessionid:0x13e87d872090002 type:ping cxid:0xfffe
zxid:0xfffe txntype:unknown reqpath:n/a

{2013-05-08 23:53:07,256} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:160) - sessionid:0x13e87d872090002 type:ping
cxid:0xfffe zxid:0xfffe txntype:unknown reqpath:n/a

{2013-05-08 23:53:07,257} DEBUG [main-SendThread(localhost:2181)]
(ClientCnxn.java:758) - Got ping response for sessionid: 0x13e87d872090002
after 2ms

{2013-05-08 23:53:07,279} DEBUG [kafka-request-handler-1] (Logging.scala:51)
- [Kafka Request Handler 1 on Broker 0], handles request
Request(0,sun.nio.ch.SelectionKeyImpl@1261db2b,null,1368078787227,/127.0.0.1
:63245)

{2013-05-08 23:53:07,290} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:78) - Processing request::
sessionid:0x13e87d87209 type:exists cxid:0x26 zxid:0xfffe
txntype:unknown reqpath:/brokers/topics/robert_v_2x0

{2013-05-08 23:53:07,291} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:160) - sessionid:0x13e87d87209 type:exists
cxid:0x26 zxid:0xfffe txntype:unknown
reqpath:/brokers/topics/robert_v_2x0

{2013-05-08 23:53:07,293} DEBUG [Thread-1-SendThread(localhost:2181)]
(ClientCnxn.java:838) - Reading reply sessionid:0x13e87d87209, packet::
clientPath:null serverPath:null finished:false header:: 38,3  replyHeader::
38,955,-101  request:: '/brokers/topics/robert_v_2x0,F  response::  

{2013-05-08 23:53:07,373} ERROR [kafka-request-handler-1]
(Logging.scala:102) - [KafkaApi-0] Error while retrieving topic metadata

java.lang.NoClassDefFoundError: joptsimple/OptionSpec

   at
kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis
.scala:411)

   at
kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis
.scala:403)

   at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)

   at
kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:402)

   at kafka.server.KafkaApis.handle(KafkaApis.scala:63)

   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)

   at java.lang.Thread.run(Thread.java:722)

Caused by: java.lang.ClassNotFoundException: joptsimple.OptionSpec

   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

   at java.security.AccessController.doPrivileged(Native Method)

   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

   at java.lang.ClassLoader.loadClass(ClassLoader.java:423)

   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

   at java.lang.ClassLoader.loadClass(ClassLoader.java:356)

   ... 7 more

{2013-05-08 23:53:07,388} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:78) - Processing request::
sessionid:0x13e87d872090003 type:ping cxid:0xfffe
zxid:0xfffe txntype:unknown reqpath:n/a

{2013-05-08 23:53:07,393} DEBUG [SyncThread:0]
(FinalRequestProcessor.java:160) - sessionid:0x13e87d872090003 type:ping
cxid:0xfffe zxid:0xfffe txntype:unknown reqpath:n/a

{2013-05-08 23:53:07,395} DEBUG [main-SendThread(localhost:2181)]
(ClientCnxn.java:758) - Got ping response for sessionid: 0x13e87d872090003
after 8ms

{2013-05-08 23:53:07,557}  INFO [local-vat] (Logging.scala:67) -
Disconnecting from localhost:9092

{2013-05-08 23:53:07,570} DEBUG [local-vat] (Logging.scala:51) -
Successfully fetched metadata for 1 topic(s) Set(robert_v_2x0)

{2013-05-08 23:53:07,583} DEBUG [local-vat] (Logging.scala:51) - Getting
broker partition info for topic robert_v_2x0

{2013-05-08 23:53:07,600}  INFO [local-vat] (Logging.scala:67) - Fetching
metadata with co

Re: Can't connect to a server if not enough partitions

2013-05-08 Thread Neha Narkhede
I think this error message is somewhat misleading since we create topic on
the first metadata request. It is complaining that a topic with the
required replication factor cannot be created if there aren't enough
brokers to satisfy the replication factor. This is expected behavior
whether you use auto creation of topics or manual creation. However, the
metadata requests will always give you correct information about existing
topics.

Thanks,
Neha


On Wed, May 8, 2013 at 10:15 PM, Jason Rosenberg  wrote:

> With 0.8.0, I'm seeing that an initial metadata request fails, if the
> number of running brokers is fewer than the configured replication factor:
>
> 877 [kafka-request-handler-0] ERROR kafka.server.KafkaApis  -
> [KafkaApi-1946108683] Error while retrieving topic metadata
> kafka.admin.AdministrationException: replication factor: 2 larger than
> available brokers: 1
> at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:62)
> at kafka.admin.CreateTopicCommand$.createTopic(CreateTopicCommand.scala:92)
> at
>
> kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:409)
> at
>
> kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:401)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:400)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:61)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
> at java.lang.Thread.run(Thread.java:680)
>
> However, if after connecting, the number of brokers goes down, producing
> clients have no problems continuing sending messages, etc.
>
> So, I thought the idea was that once a replica becomes available, it will
> be caught up with messages it might have missed, etc.  This is good because
> it makes doing things like rolling restarts of the brokers possible, etc.
>  But it's a problem if a rolling restart happens at the same time a new
> client is coming online to try and initialize a connection.
>
> Thoughts?
>
> Shouldn't the requirements be the same for initial connections as ongoing
> connections?
>
> Jason
>


Re: can the replication factor for a topic be changed after it's created?

2013-05-08 Thread Neha Narkhede
If you add more brokers to an existing cluster, you can use the partition
reassignment admin tool to move a replica of selected partitions over to
the new broker.

Thanks,
Neha


On Wed, May 8, 2013 at 10:27 PM, Jason Rosenberg  wrote:

> It looks like by default, the first time a new message arrives for a given
> topic, it will receive the default replication factor in place on the
> broker at the time it is first received.
>
> Is it possible to change this later (e.g. say if we add more hardware to
> the cluster at a later date, etc.)?
>
> Thanks,
>
> Jason
>


can the replication factor for a topic be changed after it's created?

2013-05-08 Thread Jason Rosenberg
It looks like by default, the first time a new message arrives for a given
topic, it will receive the default replication factor in place on the
broker at the time it is first received.

Is it possible to change this later (e.g. say if we add more hardware to
the cluster at a later date, etc.)?

Thanks,

Jason


Re: expected exceptions?

2013-05-08 Thread Jason Rosenberg
Filed:

https://issues.apache.org/jira/browse/KAFKA-899
https://issues.apache.org/jira/browse/KAFKA-900

Jason


On Wed, May 8, 2013 at 9:22 PM, Jun Rao  wrote:

> Yes, could you file a jira? Please include the log messages before those
> exceptions.
>
> Thanks,
>
> Jun
>
>
> On Wed, May 8, 2013 at 9:55 AM, Jason Rosenberg  wrote:
>
> > If expected, does it make sense to log them as exceptions as such?  Can
> we
> > instead log something meaningful to the console, like:
> >
> > "No leader was available, one will now be created"
> >
> > or
> >
> > "ConsumerConnector has shutdown"
> >
> > etc.
> >
> > Should I file jira's for these?
> >
> > Jason
> >
> >
> > On Wed, May 8, 2013 at 8:22 AM, Jun Rao  wrote:
> >
> > > Yes, both are expected.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, May 8, 2013 at 12:16 AM, Jason Rosenberg 
> > wrote:
> > >
> > > > I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the
> > > > following, all embedded in the same java process:
> > > >
> > > > -- spins up a zk instance
> > > > -- spins up a kafka server using a fresh log directory
> > > > -- creates a producer and sends a message
> > > > -- creates a high-level consumer and verifies that it can consume the
> > > > message
> > > > -- shuts down the consumer
> > > > -- stops the kafka server
> > > > -- stops zk
> > > >
> > > > The test seems to be working fine now, however, I consistently see
> the
> > > > following exceptions (which from poking around the mailing list seem
> to
> > > be
> > > > expected?).  If these are expected, can we suppress the logging of
> > these
> > > > exceptions, since it clutters the output of tests, and presumably,
> > > clutters
> > > > the logs of the running server/consumers, during clean startup and
> > > > shutdown..
> > > >
> > > > When I call producer.send(), I get:
> > > >
> > > > kafka.common.LeaderNotAvailableException: No leader for any partition
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > > > at kafka.producer.Producer.send(Producer.scala:74)
> > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > > ...
> > > >   ...
> > > >
> > > > When I call consumerConnector.shutdown(), I get:
> > > >
> > > > java.nio.channels.ClosedByInterruptException
> > > > at
> > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
> > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
> > > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
> > > > at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetch

Can't connect to a server if not enough partitions

2013-05-08 Thread Jason Rosenberg
With 0.8.0, I'm seeing that an initial metadata request fails, if the
number of running brokers is fewer than the configured replication factor:

877 [kafka-request-handler-0] ERROR kafka.server.KafkaApis  -
[KafkaApi-1946108683] Error while retrieving topic metadata
kafka.admin.AdministrationException: replication factor: 2 larger than
available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:62)
at kafka.admin.CreateTopicCommand$.createTopic(CreateTopicCommand.scala:92)
at
kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:409)
at
kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:401)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:400)
at kafka.server.KafkaApis.handle(KafkaApis.scala:61)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
at java.lang.Thread.run(Thread.java:680)

However, if after connecting, the number of brokers goes down, producing
clients have no problems continuing sending messages, etc.

So, I thought the idea was that once a replica becomes available, it will
be caught up with messages it might have missed, etc.  This is good because
it makes doing things like rolling restarts of the brokers possible, etc.
 But it's a problem if a rolling restart happens at the same time a new
client is coming online to try and initialize a connection.

Thoughts?

Shouldn't the requirements be the same for initial connections as ongoing
connections?

Jason


RE: a few questions from high level consumer documentation.

2013-05-08 Thread Rob Withers


> -Original Message-
> From: Chris Curtin [mailto:curtin.ch...@gmail.com]

> > 1 When you say the iterator may block, do you mean hasNext() may block?
> >
> 
> Yes.

Is this due to a potential non-blocking fetch (broker/zookeeper returns an
empty block if offset is current)?  Yet this blocks the network call of the
consumer iterator, do I have that right?  Are there other reasons it could
block?  Like the call fails and a backup call is made?

> > b.  For client crash, what can client do to avoid duplicate messages
> > when restarted? What I can think of is to read last message from log
> > file and ignore the first few received duplicate messages until
> > receiving the last read message. But is it possible for client to read
log file
> directly?
> >
> 
> If you can't tolerate the possibility of duplicates you need to look at
the
> Simple Consumer example, There you control the offset storage.

Do you have example code that manages only once, even when a consumer for a
given partition goes away?

What does happen with rebalancing when a consumer goes away?  Is this
behavior of the high-level consumer group?  Is there a way to supply one's
own simple consumer with only once, within a consumer group that rebalances?

What happens if a producer goes away?

thanks much,
rob




Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Jun Rao
There is also a Ganglia reporter for metrics. Don't know how much overhead
it adds though.

Thanks,

Jun


On Wed, May 8, 2013 at 2:10 PM, Otis Gospodnetic  wrote:

> Hi Jun,
>
> Does that imply that what 0.8 puts in JMX is a superset of what's in JMX
> in 0.7?
> Or have names or types of beans changed?
>
> Also, do you recommend getting metrics via JMX or via HTTP?
>
> Thanks,
> Otis
> 
> Monitoring for Solr / ElasticSearch / HBase / Hadoop -
> http://sematext.com/spm
>
>
>
>
> >
> > From: Jun Rao 
> >To: "users@kafka.apache.org" 
> >Sent: Wednesday, May 8, 2013 11:20 AM
> >Subject: Re: Kafka Monitoring, 0.7 vs. 0.8 JMX
> >
> >
> >0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
> >metrics. One can attach a metric reporter for monitoring.
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com
> >> wrote:
> >
> >> Hi,
> >>
> >> We're considering adding Kafka monitoring to SPM (see
> >> http://sematext.com/spm/index.html ).  We use Kafka ourselves and
> >> would like to see our Kafka metrics in SPM along with Hadoop, HBase,
> >> Solr, and other metrics we monitor.
> >>
> >> My questions:
> >> * What do people currently use for monitoring Kafka?
> >> * Is there apetite for adding Kafka support to SPM?
> >> * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
> >> 0.8 a backwards-compatible superset of 0.7?
> >>
> >> Thanks,
> >> Otis
> >> --
> >> Search Analytics - http://sematext.com/search-analytics/index.html
> >> Performance Monitoring - http://sematext.com/spm/index.html
> >>
> >
> >
> >
>


Re: How to create the initial zookeeper chroot path for zk.connect?

2013-05-08 Thread Jun Rao
Actually, we already have a jira: KAFKA-294.

Thanks,

Jun


On Wed, May 8, 2013 at 4:37 PM, Jason Rosenberg  wrote:

> It works if I manually create the chroot first.  But this is a bit
> cumbersome if I want to do an automated roll out to multiple deployments,
> etcbut workable
>
> Should I file a jira?
>
>
> On Wed, May 8, 2013 at 4:31 PM, Jason Rosenberg  wrote:
>
> > I'm seeing this issue with a single node zk instance, on my localhost.
>  If
> > my zkconnect is "localhost:12345", it works...
> >
> > but if I add a chroot, e.g.: "localhost:12345/xyz", I get the same error:
> > java.lang.IllegalArgumentException: Path length must be > 0
> >
> > I also get the error if I do: "locahost:12345,localhost:67890/xyz"
> >
> > Do I actually have to create the chroot manually first?  Or shouldn't it
> > get created automatically?
> >
> > This using 0.8.0 latest.
> >
> > Is this the same issue others are seeing (I realize previously the issue
> > was with multiple zk hosts).  Does this need a jira?
> >
> > Also, I think the config page for zookeeper.connect really needs to be
> > updated to make explicit that the 'chroot' part only gets added at the
> very
> > end, because it's not clear at all
> >
> > Jason
> >
> >
> > On Mon, Apr 22, 2013 at 6:46 AM, Ryan Chan 
> wrote:
> >
> >> It would be better if there is another configuration directive, e.g.
> >> zk.chroot for the chroot path, currently it is not consistent as we also
> >> need to specify the port for each zookeeper, isn't?
> >>
> >> Anyway, the doc can better explained this situation..
> >>
> >> Thanks anyway!
> >>
> >>
> >> On Sun, Apr 21, 2013 at 11:10 PM, Scott Clasen 
> wrote:
> >>
> >> > Since There is only 1 chroot for a zk cluster, if you specified for
> each
> >> > server there would be a potential for error/mismatch
> >> >
> >> > Things would probably go really bad if you had mismatched chroots :)
> >> >
> >> > Sent from my iPhone
> >> >
> >> > On Apr 21, 2013, at 1:34 AM, Ryan Chan  wrote:
> >> >
> >> > > Thanks, this solved the problem.
> >> > >
> >> > > But the connection string as "Zk1:2181,zk2:2181,zk3;2181/Kafka",
> seems
> >> > > unintuitive?
> >> > >
> >> > >
> >> > > On Sun, Apr 21, 2013 at 2:29 AM, Scott Clasen 
> >> wrote:
> >> > >
> >> > >> Afaik you only put the chroot on the end of the zk conn str...
> >> > >>
> >> > >> Zk1:2181,zk2:2181,zk3;2181/Kafka
> >> > >>
> >> > >> Not
> >> > >>
> >> > >> Zk1:2181/kafka,zk2:2181/Kafka,zk3:2181/Kafka
> >> > >>
> >> > >>
> >> > >> Sent from my iPhone
> >> > >>
> >> > >> On Apr 20, 2013, at 9:03 AM, Neha Narkhede <
> neha.narkh...@gmail.com>
> >> > >> wrote:
> >> > >>
> >> > >>> Please file a bug and mention the Kafka and zookeeper versions
> used
> >> for
> >> > >> the
> >> > >>> test.
> >> > >>>
> >> > >>> Thanks,
> >> > >>> Neha
> >> > >>>
> >> > >>> On Saturday, April 20, 2013, Ryan Chan wrote:
> >> > >>>
> >> >  Hello,
> >> > 
> >> >  Tried, still the same...
> >> > 
> >> > 
> >> >  bin/zkCli.sh -server
> >> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
> >> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 0]  ls /
> >> >  [testkafka, consumers, brokers, zookeeper]
> >> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 1] rmr
> /testkafka
> >> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 2] create
> >> /testkafka
> >> > ''
> >> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 3] ls /
> >> >  [testkafka, consumers, brokers, zookeeper]
> >> > 
> >> > 
> >> > 
> >> >  And restart Kafka
> >> > 
> >> >  [2013-04-20 09:20:58,336] FATAL Fatal error during
> >> KafkaServerStable
> >> >  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> >> >  java.lang.IllegalArgumentException: Path length must be > 0
> >> >  at
> >> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
> >> >  at
> >> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
> >> >  at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
> >> >  at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
> >> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
> >> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
> >> >  at
> >> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >> >  at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
> >> >  at
> org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
> >> >  at
> org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
> >> >  at
> org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
> >> >  at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:47)
> >> >  at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:59)
> >> >  at
> >> > >>
> >> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:71)
> >> >  at
> >> > >>
> >> kafka.server.KafkaZooKeeper.registerB

Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Jun Rao
Otis,

0.8 jmx is quite different from 0.7.

Thanks,

Jun


On Wed, May 8, 2013 at 2:10 PM, Otis Gospodnetic  wrote:

> Hi Jun,
>
> Does that imply that what 0.8 puts in JMX is a superset of what's in JMX
> in 0.7?
> Or have names or types of beans changed?
>
> Also, do you recommend getting metrics via JMX or via HTTP?
>
> Thanks,
> Otis
> 
> Monitoring for Solr / ElasticSearch / HBase / Hadoop -
> http://sematext.com/spm
>
>
>
>
> >
> > From: Jun Rao 
> >To: "users@kafka.apache.org" 
> >Sent: Wednesday, May 8, 2013 11:20 AM
> >Subject: Re: Kafka Monitoring, 0.7 vs. 0.8 JMX
> >
> >
> >0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
> >metrics. One can attach a metric reporter for monitoring.
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com
> >> wrote:
> >
> >> Hi,
> >>
> >> We're considering adding Kafka monitoring to SPM (see
> >> http://sematext.com/spm/index.html ).  We use Kafka ourselves and
> >> would like to see our Kafka metrics in SPM along with Hadoop, HBase,
> >> Solr, and other metrics we monitor.
> >>
> >> My questions:
> >> * What do people currently use for monitoring Kafka?
> >> * Is there apetite for adding Kafka support to SPM?
> >> * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
> >> 0.8 a backwards-compatible superset of 0.7?
> >>
> >> Thanks,
> >> Otis
> >> --
> >> Search Analytics - http://sematext.com/search-analytics/index.html
> >> Performance Monitoring - http://sematext.com/spm/index.html
> >>
> >
> >
> >
>


Re: expected exceptions?

2013-05-08 Thread Jun Rao
Yes, could you file a jira? Please include the log messages before those
exceptions.

Thanks,

Jun


On Wed, May 8, 2013 at 9:55 AM, Jason Rosenberg  wrote:

> If expected, does it make sense to log them as exceptions as such?  Can we
> instead log something meaningful to the console, like:
>
> "No leader was available, one will now be created"
>
> or
>
> "ConsumerConnector has shutdown"
>
> etc.
>
> Should I file jira's for these?
>
> Jason
>
>
> On Wed, May 8, 2013 at 8:22 AM, Jun Rao  wrote:
>
> > Yes, both are expected.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, May 8, 2013 at 12:16 AM, Jason Rosenberg 
> wrote:
> >
> > > I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the
> > > following, all embedded in the same java process:
> > >
> > > -- spins up a zk instance
> > > -- spins up a kafka server using a fresh log directory
> > > -- creates a producer and sends a message
> > > -- creates a high-level consumer and verifies that it can consume the
> > > message
> > > -- shuts down the consumer
> > > -- stops the kafka server
> > > -- stops zk
> > >
> > > The test seems to be working fine now, however, I consistently see the
> > > following exceptions (which from poking around the mailing list seem to
> > be
> > > expected?).  If these are expected, can we suppress the logging of
> these
> > > exceptions, since it clutters the output of tests, and presumably,
> > clutters
> > > the logs of the running server/consumers, during clean startup and
> > > shutdown..
> > >
> > > When I call producer.send(), I get:
> > >
> > > kafka.common.LeaderNotAvailableException: No leader for any partition
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > >   ...
> > >
> > > When I call consumerConnector.shutdown(), I get:
> > >
> > > java.nio.channels.ClosedByInterruptException
> > > at
> > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
> > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> > > at
> > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Jason
> > >
> >
>


Re: a few questions from high level consumer documentation.

2013-05-08 Thread Jun Rao
For #3, we need to checkpoint offsets to a central place so that if a
consumer fails, another consumer in the same group can pick up from where
it's left off.

For #4c, leader change doesn't introduce duplicates.

Thanks,

Jun


On Wed, May 8, 2013 at 9:17 AM, Yu, Libo  wrote:

> Hi,
>
> I read this link
> https://cwiki.apache.org/KAFKA/consumer-group-example.html
> and have a few questions (if not too many).
>
> 1 When you say the iterator may block, do you mean hasNext() may block?
>
> 2 "Remember, you can only use a single process per Consumer Group."
> Do you mean we can only use a single process on one node of the
> cluster for a consumer group?
> Or there can be only one process on the whole cluster for a consumer
> group? Please clarify on this.
>
> 3 Why save offset to zookeeper? Is it easier to save it to a local file?
>
> 4 When client exits/crashes or leader for a partition is changed,
> duplicate messages may be replayed. "To help avoid this (replayed duplicate
> messages), make sure you provide a clean way for your client to exit
> instead of assuming it can be 'kill -9'd."
>
> a.   For client exit, if the client is receiving data at the time, how
> to do a clean exit? How can client tell consumer to write offset to
> zookeepr before exiting?
>
>
> b.  For client crash, what can client do to avoid duplicate messages
> when restarted? What I can think of is to read last message from log file
> and ignore the first few received duplicate messages until receiving the
> last read message. But is it possible for client to read log file directly?
>
>
> c.   For the change of the partition leader, is there anything that
> clients can do to avoid duplicates?
>
> Thanks.
>
>
>
> Libo
>
>


Re: How to create the initial zookeeper chroot path for zk.connect?

2013-05-08 Thread Jason Rosenberg
It works if I manually create the chroot first.  But this is a bit
cumbersome if I want to do an automated roll out to multiple deployments,
etcbut workable

Should I file a jira?


On Wed, May 8, 2013 at 4:31 PM, Jason Rosenberg  wrote:

> I'm seeing this issue with a single node zk instance, on my localhost.  If
> my zkconnect is "localhost:12345", it works...
>
> but if I add a chroot, e.g.: "localhost:12345/xyz", I get the same error:
> java.lang.IllegalArgumentException: Path length must be > 0
>
> I also get the error if I do: "locahost:12345,localhost:67890/xyz"
>
> Do I actually have to create the chroot manually first?  Or shouldn't it
> get created automatically?
>
> This using 0.8.0 latest.
>
> Is this the same issue others are seeing (I realize previously the issue
> was with multiple zk hosts).  Does this need a jira?
>
> Also, I think the config page for zookeeper.connect really needs to be
> updated to make explicit that the 'chroot' part only gets added at the very
> end, because it's not clear at all
>
> Jason
>
>
> On Mon, Apr 22, 2013 at 6:46 AM, Ryan Chan  wrote:
>
>> It would be better if there is another configuration directive, e.g.
>> zk.chroot for the chroot path, currently it is not consistent as we also
>> need to specify the port for each zookeeper, isn't?
>>
>> Anyway, the doc can better explained this situation..
>>
>> Thanks anyway!
>>
>>
>> On Sun, Apr 21, 2013 at 11:10 PM, Scott Clasen  wrote:
>>
>> > Since There is only 1 chroot for a zk cluster, if you specified for each
>> > server there would be a potential for error/mismatch
>> >
>> > Things would probably go really bad if you had mismatched chroots :)
>> >
>> > Sent from my iPhone
>> >
>> > On Apr 21, 2013, at 1:34 AM, Ryan Chan  wrote:
>> >
>> > > Thanks, this solved the problem.
>> > >
>> > > But the connection string as "Zk1:2181,zk2:2181,zk3;2181/Kafka", seems
>> > > unintuitive?
>> > >
>> > >
>> > > On Sun, Apr 21, 2013 at 2:29 AM, Scott Clasen 
>> wrote:
>> > >
>> > >> Afaik you only put the chroot on the end of the zk conn str...
>> > >>
>> > >> Zk1:2181,zk2:2181,zk3;2181/Kafka
>> > >>
>> > >> Not
>> > >>
>> > >> Zk1:2181/kafka,zk2:2181/Kafka,zk3:2181/Kafka
>> > >>
>> > >>
>> > >> Sent from my iPhone
>> > >>
>> > >> On Apr 20, 2013, at 9:03 AM, Neha Narkhede 
>> > >> wrote:
>> > >>
>> > >>> Please file a bug and mention the Kafka and zookeeper versions used
>> for
>> > >> the
>> > >>> test.
>> > >>>
>> > >>> Thanks,
>> > >>> Neha
>> > >>>
>> > >>> On Saturday, April 20, 2013, Ryan Chan wrote:
>> > >>>
>> >  Hello,
>> > 
>> >  Tried, still the same...
>> > 
>> > 
>> >  bin/zkCli.sh -server
>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
>> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 0]  ls /
>> >  [testkafka, consumers, brokers, zookeeper]
>> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 1] rmr /testkafka
>> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 2] create
>> /testkafka
>> > ''
>> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 3] ls /
>> >  [testkafka, consumers, brokers, zookeeper]
>> > 
>> > 
>> > 
>> >  And restart Kafka
>> > 
>> >  [2013-04-20 09:20:58,336] FATAL Fatal error during
>> KafkaServerStable
>> >  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
>> >  java.lang.IllegalArgumentException: Path length must be > 0
>> >  at
>> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
>> >  at
>> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
>> >  at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
>> >  at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
>> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
>> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
>> >  at
>> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >  at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
>> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
>> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
>> >  at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:47)
>> >  at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:59)
>> >  at
>> > >>
>> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:71)
>> >  at
>> > >>
>> kafka.server.KafkaZooKeeper.registerBrokerInZk(KafkaZooKeeper.scala:54)
>> >  at kafka.log.LogManager.startup(LogManager.scala:130)
>> >  at kafka.server.KafkaServer.startup(KafkaServer.scala:81)
>> >  at
>> > >>
>> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>> >  at kafka.Kafka$.main(Kafka.scala:47)
>> >  at kafka.Kafka.main(Kafka.scala)
>> > 
>> > 
>> > 
>> >  Maybe I should report a bug?
>> > >>

Re: How to create the initial zookeeper chroot path for zk.connect?

2013-05-08 Thread Jason Rosenberg
I'm seeing this issue with a single node zk instance, on my localhost.  If
my zkconnect is "localhost:12345", it works...

but if I add a chroot, e.g.: "localhost:12345/xyz", I get the same error:
java.lang.IllegalArgumentException: Path length must be > 0

I also get the error if I do: "locahost:12345,localhost:67890/xyz"

Do I actually have to create the chroot manually first?  Or shouldn't it
get created automatically?

This using 0.8.0 latest.

Is this the same issue others are seeing (I realize previously the issue
was with multiple zk hosts).  Does this need a jira?

Also, I think the config page for zookeeper.connect really needs to be
updated to make explicit that the 'chroot' part only gets added at the very
end, because it's not clear at all

Jason


On Mon, Apr 22, 2013 at 6:46 AM, Ryan Chan  wrote:

> It would be better if there is another configuration directive, e.g.
> zk.chroot for the chroot path, currently it is not consistent as we also
> need to specify the port for each zookeeper, isn't?
>
> Anyway, the doc can better explained this situation..
>
> Thanks anyway!
>
>
> On Sun, Apr 21, 2013 at 11:10 PM, Scott Clasen  wrote:
>
> > Since There is only 1 chroot for a zk cluster, if you specified for each
> > server there would be a potential for error/mismatch
> >
> > Things would probably go really bad if you had mismatched chroots :)
> >
> > Sent from my iPhone
> >
> > On Apr 21, 2013, at 1:34 AM, Ryan Chan  wrote:
> >
> > > Thanks, this solved the problem.
> > >
> > > But the connection string as "Zk1:2181,zk2:2181,zk3;2181/Kafka", seems
> > > unintuitive?
> > >
> > >
> > > On Sun, Apr 21, 2013 at 2:29 AM, Scott Clasen 
> wrote:
> > >
> > >> Afaik you only put the chroot on the end of the zk conn str...
> > >>
> > >> Zk1:2181,zk2:2181,zk3;2181/Kafka
> > >>
> > >> Not
> > >>
> > >> Zk1:2181/kafka,zk2:2181/Kafka,zk3:2181/Kafka
> > >>
> > >>
> > >> Sent from my iPhone
> > >>
> > >> On Apr 20, 2013, at 9:03 AM, Neha Narkhede 
> > >> wrote:
> > >>
> > >>> Please file a bug and mention the Kafka and zookeeper versions used
> for
> > >> the
> > >>> test.
> > >>>
> > >>> Thanks,
> > >>> Neha
> > >>>
> > >>> On Saturday, April 20, 2013, Ryan Chan wrote:
> > >>>
> >  Hello,
> > 
> >  Tried, still the same...
> > 
> > 
> >  bin/zkCli.sh -server zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 0]  ls /
> >  [testkafka, consumers, brokers, zookeeper]
> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 1] rmr /testkafka
> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 2] create
> /testkafka
> > ''
> >  [zk: zookeeper1,zookeeper2,zookeeper3(CONNECTED) 3] ls /
> >  [testkafka, consumers, brokers, zookeeper]
> > 
> > 
> > 
> >  And restart Kafka
> > 
> >  [2013-04-20 09:20:58,336] FATAL Fatal error during KafkaServerStable
> >  startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> >  java.lang.IllegalArgumentException: Path length must be > 0
> >  at
> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
> >  at
> > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
> >  at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:626)
> >  at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
> >  at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
> >  at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> >  at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
> >  at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
> >  at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:47)
> >  at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:59)
> >  at
> > >>
> kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:71)
> >  at
> > >>
> kafka.server.KafkaZooKeeper.registerBrokerInZk(KafkaZooKeeper.scala:54)
> >  at kafka.log.LogManager.startup(LogManager.scala:130)
> >  at kafka.server.KafkaServer.startup(KafkaServer.scala:81)
> >  at
> > >>
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >  at kafka.Kafka$.main(Kafka.scala:47)
> >  at kafka.Kafka.main(Kafka.scala)
> > 
> > 
> > 
> >  Maybe I should report a bug?
> >  (I posted here first just to know if I have done sth stupid)
> > 
> > 
> > 
> > 
> >  On Sat, Apr 20, 2013 at 1:02 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > >> 
> > > wrote:
> > 
> > > Hmm, so if you use all 3 zookeeper servers will creating and
> reading
> > > the node, do you still see the problem ?
> > >
> > > zkCli.sh -server zookeeper1:21

Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread David DeMaagd
I think there's really two angles to look at this from...

1) What is 'important' to monitor?  Meaning, what subset of these are
important/critical for being able to tell system health (things you want
to set alerts on), what subset are nice to have for overall health and 
capacity planning (things you want to create pretty graphs on) and the
rest (not immediately useful in general, but can really help in a
debugging/triage situation). 

2) How do you get the data?  Kind of independent of the above, though 
kinda related as well.  

As for the second one, you need to look at the collection mechanics.  As
you mentioned below, large scale polling (especially with a non-trivial
number of beans) is expensive and problematic no matter how you do it
(JMX or HTTP) given enough scale.  I don't have much experience with the
codahale metrics route directly, but I have messed with Jolokia, which is
likely in the same boat - they expose the metrics for you to grab. In
both cases, given enough data points (and kafka, depending on the number
of topics involved, has a /lot/ of them), either can be slow if not
implemented carefully.  Meaning you may overrun your desired polling
interval.

In very large environments, I've found it very scalable to have either a
local poller on the box (which could be reading via JMX or HTTP) which
then emits the data to something or have some kind of wrapper around the 
application that does the collection/emission (launching the broker as a
thread, and the parent process dows some JMX magic to connect to the
data points).  Both of these routes depend a lot on your monitoring
infrastructure, but they will help you get around the general wide
polling problem...

Semi-shameless plug for how it is done at LinkedIn - 
http://engineering.linkedin.com/52/autometrics-self-service-metrics-collection

-- 
Dave DeMaagd
ddema...@linkedin.com | 818 262 7958

(dragos.manole...@servicenow.com - Wed, May 08, 2013 at 09:27:21PM +)
> From the JmxReporter section of the metrics manual:
> 
> Warning
> We don¹t recommend that you try to gather metrics from your production
> environment. JMX¹s RPC API is fragile and bonkers. For development
> purposes and browsing, though, it can be very useful.
> 
> 
> 
> -Dragos
> 
> On 5/8/13 2:10 PM, "Otis Gospodnetic"  wrote:
> 
> >Also, do you recommend getting metrics via JMX or via HTTP?
> 


Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Dragos Manolescu
>From the JmxReporter section of the metrics manual:

Warning
We don¹t recommend that you try to gather metrics from your production
environment. JMX¹s RPC API is fragile and bonkers. For development
purposes and browsing, though, it can be very useful.



-Dragos

On 5/8/13 2:10 PM, "Otis Gospodnetic"  wrote:

>Also, do you recommend getting metrics via JMX or via HTTP?



Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Otis Gospodnetic
Hi Jun,

Does that imply that what 0.8 puts in JMX is a superset of what's in JMX in 0.7?
Or have names or types of beans changed?

Also, do you recommend getting metrics via JMX or via HTTP?

Thanks,
Otis 

Monitoring for Solr / ElasticSearch / HBase / Hadoop - http://sematext.com/spm 




>
> From: Jun Rao 
>To: "users@kafka.apache.org"  
>Sent: Wednesday, May 8, 2013 11:20 AM
>Subject: Re: Kafka Monitoring, 0.7 vs. 0.8 JMX
> 
>
>0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
>metrics. One can attach a metric reporter for monitoring.
>
>Thanks,
>
>Jun
>
>
>On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic > wrote:
>
>> Hi,
>>
>> We're considering adding Kafka monitoring to SPM (see
>> http://sematext.com/spm/index.html ).  We use Kafka ourselves and
>> would like to see our Kafka metrics in SPM along with Hadoop, HBase,
>> Solr, and other metrics we monitor.
>>
>> My questions:
>> * What do people currently use for monitoring Kafka?
>> * Is there apetite for adding Kafka support to SPM?
>> * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
>> 0.8 a backwards-compatible superset of 0.7?
>>
>> Thanks,
>> Otis
>> --
>> Search Analytics - http://sematext.com/search-analytics/index.html
>> Performance Monitoring - http://sematext.com/spm/index.html
>>
>
>
>

Re: expected exceptions?

2013-05-08 Thread Jason Rosenberg
If expected, does it make sense to log them as exceptions as such?  Can we
instead log something meaningful to the console, like:

"No leader was available, one will now be created"

or

"ConsumerConnector has shutdown"

etc.

Should I file jira's for these?

Jason


On Wed, May 8, 2013 at 8:22 AM, Jun Rao  wrote:

> Yes, both are expected.
>
> Thanks,
>
> Jun
>
>
> On Wed, May 8, 2013 at 12:16 AM, Jason Rosenberg  wrote:
>
> > I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the
> > following, all embedded in the same java process:
> >
> > -- spins up a zk instance
> > -- spins up a kafka server using a fresh log directory
> > -- creates a producer and sends a message
> > -- creates a high-level consumer and verifies that it can consume the
> > message
> > -- shuts down the consumer
> > -- stops the kafka server
> > -- stops zk
> >
> > The test seems to be working fine now, however, I consistently see the
> > following exceptions (which from poking around the mailing list seem to
> be
> > expected?).  If these are expected, can we suppress the logging of these
> > exceptions, since it clutters the output of tests, and presumably,
> clutters
> > the logs of the running server/consumers, during clean startup and
> > shutdown..
> >
> > When I call producer.send(), I get:
> >
> > kafka.common.LeaderNotAvailableException: No leader for any partition
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> >   ...
> >
> > When I call consumerConnector.shutdown(), I get:
> >
> > java.nio.channels.ClosedByInterruptException
> > at
> >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Jason
> >
>


Re: a few questions from high level consumer documentation.

2013-05-08 Thread Chris Curtin
I'll try to answer some, the Kafka team will need to answer the others:


On Wed, May 8, 2013 at 12:17 PM, Yu, Libo  wrote:

> Hi,
>
> I read this link
> https://cwiki.apache.org/KAFKA/consumer-group-example.html
> and have a few questions (if not too many).
>
> 1 When you say the iterator may block, do you mean hasNext() may block?
>

Yes.


>
> 2 "Remember, you can only use a single process per Consumer Group."
> Do you mean we can only use a single process on one node of the
> cluster for a consumer group?
> Or there can be only one process on the whole cluster for a consumer
> group? Please clarify on this.
>
> Bug. I'll change it. When I wrote this I mis-understood the re-balancing
step. I missed this reference but fixed the others. Sorry



> 3 Why save offset to zookeeper? Is it easier to save it to a local file?
>
> 4 When client exits/crashes or leader for a partition is changed,
> duplicate messages may be replayed. "To help avoid this (replayed duplicate
> messages), make sure you provide a clean way for your client to exit
> instead of assuming it can be 'kill -9'd."
>
> a.   For client exit, if the client is receiving data at the time, how
> to do a clean exit? How can client tell consumer to write offset to
> zookeepr before exiting?
>

If you call the shutdown() method on the Consumer it will cleanly stop,
releasing any blocked iterators. In the example it goes to sleep for a few
seconds then cleanly shuts down.


>
>
> b.  For client crash, what can client do to avoid duplicate messages
> when restarted? What I can think of is to read last message from log file
> and ignore the first few received duplicate messages until receiving the
> last read message. But is it possible for client to read log file directly?
>

If you can't tolerate the possibility of duplicates you need to look at the
Simple Consumer example, There you control the offset storage.


>
>
> c.   For the change of the partition leader, is there anything that
> clients can do to avoid duplicates?
>
> Thanks.
>
>
>
> Libo
>
>


Re: Can't use ':' in client name?

2013-05-08 Thread Dennis Haller
Hi,

We found the same applies for topic names.

Dennis



On Fri, Mar 29, 2013 at 7:57 AM, Jun Rao  wrote:

> Chris,
>
> Client id is used for registering jmx beans for monitoring. Because of the
> restrictions in bean names, we limit the client id to be only alpha-numeric
> plus "-" and "_".
>
> Thanks,
>
> Jun
>
> On Fri, Mar 29, 2013 at 5:54 AM, Chris Curtin  >wrote:
>
> > Hi,
> >
> > Before I submit an enhancement JIRA, is there are reason I can't use a
> > colon (:) or parenthesis in a client name for Simple Consumer?
> >
> > I wanted to do something like  'Web:topic(partition)' so I know this is
> the
> > Web process for topic and partition.
> >
> > Thanks,
> >
> > Chris
> >
>


Re: recover from corrupt log file?

2013-05-08 Thread Jun Rao
If you hard-kill (kill -9) a broker, it will do log validation and recovery
(by truncating the segment off from the first invalid message), but only on
the last segment. If you have corruption in earlier segments, the simplest
way is to skip that segment by manually setting the consumer offset to the
offset of the next segment.

Thanks,

Jun


On Tue, May 7, 2013 at 11:17 AM, Todd Bilsborrow <
tbilsbor...@rhythmnewmedia.com> wrote:

> Are there any recommended steps to take to try and recover a corrupt log
> file?
>
> I'm running Kafka 0.7.0, using java apis for both production and
> consumption. If I attempt to read a message from a certain offset using the
> simple consumer, I get the following on the client:
>
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:486)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:57)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:184)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:88)
> at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:43)
>
> and the following on the broker:
>
> ERROR Closing socket for /xx.xx.xx.xx because of error
> (kafka.network.Processor)
> java.io.IOException: Input/output error
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
> at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
> at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
> at kafka.network.Processor.write(SocketServer.scala:332)
> at kafka.network.Processor.run(SocketServer.scala:209)
> at java.lang.Thread.run(Thread.java:662)
>
> When I run DumpLogSegments on the file, it prints all messages up to the
> seemingly corrupt offset, then pauses for several seconds, then exits with
> the message "tail of the log is at offset: 152722143050" - which is the
> offset that appears to be the start of the corruption. My next log file
> starts at offset 153008674335, so there are a couple hundred MB (~couple
> million messages) that I can't access.
>
> Just curious if there are any next "best practice" steps.
>


a few questions from high level consumer documentation.

2013-05-08 Thread Yu, Libo
Hi,

I read this link https://cwiki.apache.org/KAFKA/consumer-group-example.html
and have a few questions (if not too many).

1 When you say the iterator may block, do you mean hasNext() may block?

2 "Remember, you can only use a single process per Consumer Group."
Do you mean we can only use a single process on one node of the cluster for 
a consumer group?
Or there can be only one process on the whole cluster for a consumer group? 
Please clarify on this.

3 Why save offset to zookeeper? Is it easier to save it to a local file?

4 When client exits/crashes or leader for a partition is changed, duplicate 
messages may be replayed. "To help avoid this (replayed duplicate messages), 
make sure you provide a clean way for your client to exit instead of assuming 
it can be 'kill -9'd."

a.   For client exit, if the client is receiving data at the time, how to 
do a clean exit? How can client tell consumer to write offset to zookeepr 
before exiting?


b.  For client crash, what can client do to avoid duplicate messages when 
restarted? What I can think of is to read last message from log file and ignore 
the first few received duplicate messages until receiving the last read 
message. But is it possible for client to read log file directly?


c.   For the change of the partition leader, is there anything that clients 
can do to avoid duplicates?

Thanks.



Libo



Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Jun Rao
See http://metrics.codahale.com/getting-started/#reporting-via-http

Thanks,

Jun


On Wed, May 8, 2013 at 9:03 AM, Dennis Haller wrote:

> What exactly is a metric reporter - something in log4j?
>
> Thanks
> Dennis
>
>
>
> On Wed, May 8, 2013 at 8:20 AM, Jun Rao  wrote:
>
> > 0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
> > metrics. One can attach a metric reporter for monitoring.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com
> > > wrote:
> >
> > > Hi,
> > >
> > > We're considering adding Kafka monitoring to SPM (see
> > > http://sematext.com/spm/index.html ).  We use Kafka ourselves and
> > > would like to see our Kafka metrics in SPM along with Hadoop, HBase,
> > > Solr, and other metrics we monitor.
> > >
> > > My questions:
> > > * What do people currently use for monitoring Kafka?
> > > * Is there apetite for adding Kafka support to SPM?
> > > * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
> > > 0.8 a backwards-compatible superset of 0.7?
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Search Analytics - http://sematext.com/search-analytics/index.html
> > > Performance Monitoring - http://sematext.com/spm/index.html
> > >
> >
>


Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Dennis Haller
What exactly is a metric reporter - something in log4j?

Thanks
Dennis



On Wed, May 8, 2013 at 8:20 AM, Jun Rao  wrote:

> 0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
> metrics. One can attach a metric reporter for monitoring.
>
> Thanks,
>
> Jun
>
>
> On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com
> > wrote:
>
> > Hi,
> >
> > We're considering adding Kafka monitoring to SPM (see
> > http://sematext.com/spm/index.html ).  We use Kafka ourselves and
> > would like to see our Kafka metrics in SPM along with Hadoop, HBase,
> > Solr, and other metrics we monitor.
> >
> > My questions:
> > * What do people currently use for monitoring Kafka?
> > * Is there apetite for adding Kafka support to SPM?
> > * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
> > 0.8 a backwards-compatible superset of 0.7?
> >
> > Thanks,
> > Otis
> > --
> > Search Analytics - http://sematext.com/search-analytics/index.html
> > Performance Monitoring - http://sematext.com/spm/index.html
> >
>


0.8 beta release status

2013-05-08 Thread Jun Rao
We are investigating a metadata related issue when there are a large number
of clients (1000+). This issue, if not resolved, may cause the whole
cluster to be unavailable. We are testing a fix. Once the issue is
resolved, we can start the release process.

Thanks,

Jun


RE: Binary Data and Kafka

2013-05-08 Thread Sybrandy, Casey
That's what I would have assumed.  And no, we're not using compression.

Thanks.

From: Jun Rao [mailto:jun...@gmail.com]
Sent: Wednesday, May 08, 2013 11:26 AM
To: users@kafka.apache.org
Cc: Sybrandy, Casey
Subject: Re: Binary Data and Kafka

No. Kafka broker stores the binary data as it is. The binary data may be 
compressed, if compression is enabled at the producer.

Thanks,

Jun

On Wed, May 8, 2013 at 5:57 AM, Sybrandy, Casey 
mailto:casey.sybra...@six3systems.com>> wrote:
All,

Does the Kafka broker Base64 encode the messages?  We are sending binary data 
to the brokers and I looked at the logs to confirm that they data was being 
stored, however all of the data, with a few exceptions, looks to be Base64 
encoded.  I didn't expect this, so I wanted to ask and confirm what I'm seeing.

If this is true, does this affect the size of the message when fetching?  In 
other words, if I send a 100K message, do I have to make sure I can fetch a 
300K message since the message can now be 300K in size because of the encoding?

Casey Sybrandy MSWE
Six3Systems
Cyber and Enterprise Systems Group
www.six3systems.com
301-206-6000 (Office)
301-206-6020 (Fax)
11820 West Market Place
Suites N-P
Fulton, MD. 20759



Re: Binary Data and Kafka

2013-05-08 Thread Jun Rao
No. Kafka broker stores the binary data as it is. The binary data may be
compressed, if compression is enabled at the producer.

Thanks,

Jun


On Wed, May 8, 2013 at 5:57 AM, Sybrandy, Casey <
casey.sybra...@six3systems.com> wrote:

> All,
>
> Does the Kafka broker Base64 encode the messages?  We are sending binary
> data to the brokers and I looked at the logs to confirm that they data was
> being stored, however all of the data, with a few exceptions, looks to be
> Base64 encoded.  I didn't expect this, so I wanted to ask and confirm what
> I'm seeing.
>
> If this is true, does this affect the size of the message when fetching?
>  In other words, if I send a 100K message, do I have to make sure I can
> fetch a 300K message since the message can now be 300K in size because of
> the encoding?
>
> Casey Sybrandy MSWE
> Six3Systems
> Cyber and Enterprise Systems Group
> www.six3systems.com
> 301-206-6000 (Office)
> 301-206-6020 (Fax)
> 11820 West Market Place
> Suites N-P
> Fulton, MD. 20759
>
>


Re: expected exceptions?

2013-05-08 Thread Jun Rao
Yes, both are expected.

Thanks,

Jun


On Wed, May 8, 2013 at 12:16 AM, Jason Rosenberg  wrote:

> I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the
> following, all embedded in the same java process:
>
> -- spins up a zk instance
> -- spins up a kafka server using a fresh log directory
> -- creates a producer and sends a message
> -- creates a high-level consumer and verifies that it can consume the
> message
> -- shuts down the consumer
> -- stops the kafka server
> -- stops zk
>
> The test seems to be working fine now, however, I consistently see the
> following exceptions (which from poking around the mailing list seem to be
> expected?).  If these are expected, can we suppress the logging of these
> exceptions, since it clutters the output of tests, and presumably, clutters
> the logs of the running server/consumers, during clean startup and
> shutdown..
>
> When I call producer.send(), I get:
>
> kafka.common.LeaderNotAvailableException: No leader for any partition
> at
>
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
>
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
> at
>
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:74)
> at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> ...
>   ...
>
> When I call consumerConnector.shutdown(), I get:
>
> java.nio.channels.ClosedByInterruptException
> at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> Jason
>


Re: Kafka Monitoring, 0.7 vs. 0.8 JMX

2013-05-08 Thread Jun Rao
0.8 JMX is different from 0.7. In 0.8, all jmx beans are exposed through
metrics. One can attach a metric reporter for monitoring.

Thanks,

Jun


On Tue, May 7, 2013 at 1:45 PM, Otis Gospodnetic  wrote:

> Hi,
>
> We're considering adding Kafka monitoring to SPM (see
> http://sematext.com/spm/index.html ).  We use Kafka ourselves and
> would like to see our Kafka metrics in SPM along with Hadoop, HBase,
> Solr, and other metrics we monitor.
>
> My questions:
> * What do people currently use for monitoring Kafka?
> * Is there apetite for adding Kafka support to SPM?
> * How different is the structure/content of JMX for 0.7 vs. 0.8?  Is
> 0.8 a backwards-compatible superset of 0.7?
>
> Thanks,
> Otis
> --
> Search Analytics - http://sematext.com/search-analytics/index.html
> Performance Monitoring - http://sematext.com/spm/index.html
>


Binary Data and Kafka

2013-05-08 Thread Sybrandy, Casey
All,

Does the Kafka broker Base64 encode the messages?  We are sending binary data 
to the brokers and I looked at the logs to confirm that they data was being 
stored, however all of the data, with a few exceptions, looks to be Base64 
encoded.  I didn't expect this, so I wanted to ask and confirm what I'm seeing.

If this is true, does this affect the size of the message when fetching?  In 
other words, if I send a 100K message, do I have to make sure I can fetch a 
300K message since the message can now be 300K in size because of the encoding?

Casey Sybrandy MSWE
Six3Systems
Cyber and Enterprise Systems Group
www.six3systems.com
301-206-6000 (Office)
301-206-6020 (Fax)
11820 West Market Place
Suites N-P
Fulton, MD. 20759



expected exceptions?

2013-05-08 Thread Jason Rosenberg
I'm porting some unit tests from 0.7.2 to 0.8.0.  The test does the
following, all embedded in the same java process:

-- spins up a zk instance
-- spins up a kafka server using a fresh log directory
-- creates a producer and sends a message
-- creates a high-level consumer and verifies that it can consume the
message
-- shuts down the consumer
-- stops the kafka server
-- stops zk

The test seems to be working fine now, however, I consistently see the
following exceptions (which from poking around the mailing list seem to be
expected?).  If these are expected, can we suppress the logging of these
exceptions, since it clutters the output of tests, and presumably, clutters
the logs of the running server/consumers, during clean startup and
shutdown..

When I call producer.send(), I get:

kafka.common.LeaderNotAvailableException: No leader for any partition
at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148)
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.javaapi.producer.Producer.send(Producer.scala:32)
...
  ...

When I call consumerConnector.shutdown(), I get:

java.nio.channels.ClosedByInterruptException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Jason