Geeting Payload back from Kafka message

2014-05-28 Thread Kumar Pradeep
Hi,
I am having trouble in getting my payload message back from the Kafka Message 
in my consumer.
My producer is passing a byte[] stream Kafka.
If someone has an examle of how do I get the payload back, please share with 
me. Really appreciate.
After iterating through the Kafka stream, I am trying to use MessageAndMetadata 
but there are many discrepancies in the parameterization of this class.

Thanks, Pradeep
 


Re: java.net.SocketTimeoutException in Broker

2014-05-28 Thread Jun Rao
The request log will show the total time that a broker takes to complete a
request. Could you see if that request takes more then request timeout to
complete?

Thanks,

Jun


On Wed, May 28, 2014 at 2:18 PM, Maung Than  wrote:

> No.
>
> Now we only have this exception in the terminal: It seems to be happening
> with gzip or large volume of sends even without compression:
>
> java.net.SocketTimeoutException
> at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> at kafka.utils.Utils$.read(Utils.scala:375)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> 2014-05-28 14:13:36:858, 2014-05-28 14:14:20:797, 1, 420, 1000, 400.54,
> 9.1159, 100, 22758.8247
>
>
> On May 28, 2014, at 12:58 PM, Guozhang Wang  wrote:
>
> > Do you see any exceptions on the broker side request logs and server
> logs?
> >
> > Guozhang
> >
> >
> > On Wed, May 28, 2014 at 12:25 PM, Maung Than 
> wrote:
> >
> >> The new value is 25000 ms.
> >> We still have the same issue.
> >>
> >> Thanks,
> >> Maung
> >>
> >> On May 27, 2014, at 3:15 PM, Guozhang Wang  wrote:
> >>
> >>> What is the new value you set? The new exception you saw seems not
> >> related
> >>> to Kafka network issues, you may want to, for example, check
> >>>
> >>> https://netbeans.org/bugzilla/show_bug.cgi?id=14
> >>>
> >>>
> >>>
> >>> On Tue, May 27, 2014 at 12:34 PM, Maung Than 
> >> wrote:
> >>>
>  We are now getting additional IO exception as well:
> 
>  May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
>  executeAcceptLoop
>  WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
>  0.0.0.0/0.0.0.0,port=0,localport=50110] throws
>  java.io.IOException: The server sockets created using the
> 

Re: Producer Side Metric Details

2014-05-28 Thread Jun Rao
The producer has a "failedToSend" jmx, which includes all kinds of errors.
We don't have the message rate per partition.

Thanks,

Jun


On Wed, May 28, 2014 at 11:41 AM, Bhavesh Mistry  wrote:

> Hi Kafka Group,
>
> I need to get following metrics from the producer side.
>
> I am able to get following metric using the ProducerTopicMetrics class per
> minute.
> messageRate
> byteRate
> droppedMessageRate
>
> I would like to know how to get above metric per topic per partition.
> Also, how do I get count of number of exception occurred due to
> kafka.common.MessageSizeTooLargeException, or Network
> Down or message rejected due to Broker being down etc.Is there a JMX or
> any other bean to get this metric counter ?
>
> We are using Kafka version 0.8.0
>
> Thanks,
>
> Bhavesh
>


Re: Question on output of kafka-producer-perf-test.sh

2014-05-28 Thread Jun Rao
If you start with an empty topic, the log size on the broker gives you the
total bytes sent.

Thanks,

Jun


On Wed, May 28, 2014 at 10:38 AM, Maung Than  wrote:

>
>  The bytes-in-rate reported on the broker is the post-compression rate.
>
> Based on that fact, I am trying to calculate the volume by multiplying
> duration and rate, i.e,  (end.time - start.time) X MB.sec ,
> but it does not give me the total.data.sent.in.MB.I put the duration in
> the brackets below:
>
> Any thoughts?
>
>
> 2014-05-27 14:10:37:347,2014-05-27 14:11:15:546, 0,  420,  1000,
>  2002.72,   52.4285,  500,  130893.4789  ( 34 sec )
>
>
> 2014-05-27 13:33:31:493,2014-05-27 13:34:11:862, 2,  420,  1000,
>  2002.72,   49.6102,  500,  123857.4153  ( 40 sec )
>
> Thanks,
> Maung
>
>
> On May 28, 2014, at 7:37 AM, Jun Rao  wrote:
>
> > The bytes-in-rate reported on the broker is the post-compression rate.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, May 27, 2014 at 9:16 PM, Maung Than 
> wrote:
> >
> >> Any idea when and which release that could be included in?
> >>
> >> We would like to have it sooner and can we do something about it?
> >>
> >> Thanks,
> >> Maung
> >>
> >> On May 27, 2014, at 5:10 PM, Guozhang Wang  wrote:
> >>
> >>> Maung,
> >>>
> >>> Yes, the throughput computed here is based on pre-compression bytes. In
> >> the
> >>> old producer we do not have a metric exposing the compression ratio, in
> >> the
> >>> new producer available in current trunk we can easily add that metric.
> I
> >>> have just created a ticket for this a moment ago.
> >>>
> >>> https://issues.apache.org/jira/browse/KAFKA-1472
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Tue, May 27, 2014 at 2:41 PM, Maung Than 
> >> wrote:
> >>>
> 
>  Hi All,
> 
>  We are seeing total data sent below is the same with or without
>  compression.
>  Is it always raw data?
>  If so is there a way we can get the compressed volume?
> 
>  start.time, end.time, compression, message.size, batch.size,
>  total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
>  [2014-05-27 14:10:37,471] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>  SLF4J: Defaulting to no-operation (NOP) logger implementation
>  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >> further
>  details.
>  [2014-05-27 14:10:37,547] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 14:10:37,549] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 14:10:37,552] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 14:10:37,560] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  2014-05-27 14:10:37:347, 2014-05-27 14:11:15:546, 0, 420, 1000,
> 2002.72,
>  52.4285, 500, 130893.4789
> 
> 
>  -bash-4.1$
> 
> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
>  --broker-list vp21q12ic-hpaj020921:9092 --messages 500 --topic
> >> imessage
>  --threads 5 --message-size 420 --batch-size 1000 --compression-codec 2
>  start.time, end.time, compression, message.size, batch.size,
>  total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
>  [2014-05-27 13:33:31,616] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>  SLF4J: Defaulting to no-operation (NOP) logger implementation
>  SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> >> further
>  details.
>  [2014-05-27 13:33:31,692] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 13:33:31,694] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 13:33:31,697] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  [2014-05-27 13:33:31,705] WARN Property reconnect.interval is not
> valid
>  (kafka.utils.VerifiableProperties)
>  2014-05-27 13:33:31:493, 2014-05-27 13:34:11:862, 2, 420, 1000,
> 2002.72,
>  49.6102, 500, 123857.4153
> 
>  Thanks,
>  Maung
> 
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >>
>
>


Re: mBean to monitor message per partitions in topic

2014-05-28 Thread Jun Rao
There is a per-partition jmx (*-ConsumerLag) in the consumer that reports
unconsumed messages per partition.

Thanks,

Jun


On Wed, May 28, 2014 at 8:13 AM, Рябков Алексей Николаевич <
a.ryab...@ntc-vulkan.ru> wrote:

> Hello!
>
> How can I get information about unfetched message per partition in topic?
>I wish to use such information to create my custom
> partitioner.class to balance messages between partitions
>
> With best regards, Aleksey Ryabkov
>
>


How to get last message

2014-05-28 Thread hsy...@gmail.com
Is there a way to get the last message of a partition for a given topic?


Re: kafka 0.8.1.1 works under java 1.6 ?

2014-05-28 Thread Joe Stein
Yes, Kafka builds against 1.6.


/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
/


> On May 28, 2014, at 7:42 PM, Weide Zhang  wrote:
> 
> Hi,
> 
> According to the Kafka documentation, seems Java 1.7 is recommended. But
> for our production environment we are still using java 1.6. Will that be a
> problem of using java 1.6 and use Kafka 0.8.1.1 ?
> 
> Thanks a lot,
> 
> Weide


kafka 0.8.1.1 works under java 1.6 ?

2014-05-28 Thread Weide Zhang
Hi,

According to the Kafka documentation, seems Java 1.7 is recommended. But
for our production environment we are still using java 1.6. Will that be a
problem of using java 1.6 and use Kafka 0.8.1.1 ?

Thanks a lot,

Weide


Re: java.net.SocketTimeoutException in Broker

2014-05-28 Thread Maung Than
No. 

Now we only have this exception in the terminal: It seems to be happening with 
gzip or large volume of sends even without compression: 

java.net.SocketTimeoutException
at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:106)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:100)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
2014-05-28 14:13:36:858, 2014-05-28 14:14:20:797, 1, 420, 1000, 400.54, 9.1159, 
100, 22758.8247


On May 28, 2014, at 12:58 PM, Guozhang Wang  wrote:

> Do you see any exceptions on the broker side request logs and server logs?
> 
> Guozhang
> 
> 
> On Wed, May 28, 2014 at 12:25 PM, Maung Than  wrote:
> 
>> The new value is 25000 ms.
>> We still have the same issue.
>> 
>> Thanks,
>> Maung
>> 
>> On May 27, 2014, at 3:15 PM, Guozhang Wang  wrote:
>> 
>>> What is the new value you set? The new exception you saw seems not
>> related
>>> to Kafka network issues, you may want to, for example, check
>>> 
>>> https://netbeans.org/bugzilla/show_bug.cgi?id=14
>>> 
>>> 
>>> 
>>> On Tue, May 27, 2014 at 12:34 PM, Maung Than 
>> wrote:
>>> 
 We are now getting additional IO exception as well:
 
 May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
 executeAcceptLoop
 WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
 0.0.0.0/0.0.0.0,port=0,localport=50110] throws
 java.io.IOException: The server sockets created using the
 LocalRMIServerSocketFactory only accept connections from clients
>> running on
 the host where the RMI remote objects have been exported.
   at
 
>> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96)
   at
 
>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
   at
 sun.rmi.t

Re: java.net.SocketTimeoutException in Broker

2014-05-28 Thread Guozhang Wang
Do you see any exceptions on the broker side request logs and server logs?

Guozhang


On Wed, May 28, 2014 at 12:25 PM, Maung Than  wrote:

> The new value is 25000 ms.
> We still have the same issue.
>
> Thanks,
> Maung
>
> On May 27, 2014, at 3:15 PM, Guozhang Wang  wrote:
>
> > What is the new value you set? The new exception you saw seems not
> related
> > to Kafka network issues, you may want to, for example, check
> >
> > https://netbeans.org/bugzilla/show_bug.cgi?id=14
> >
> >
> >
> > On Tue, May 27, 2014 at 12:34 PM, Maung Than 
> wrote:
> >
> >> We are now getting additional IO exception as well:
> >>
> >> May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
> >> executeAcceptLoop
> >> WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
> >> 0.0.0.0/0.0.0.0,port=0,localport=50110] throws
> >> java.io.IOException: The server sockets created using the
> >> LocalRMIServerSocketFactory only accept connections from clients
> running on
> >> the host where the RMI remote objects have been exported.
> >>at
> >>
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96)
> >>at
> >>
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
> >>at
> >> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
> >>at java.lang.Thread.run(Thread.java:662)
> >> 2014-05-27 12:28:50:309, 2014-05-27 12:33:25:806, 1, 420, 1000, 4005.43,
> >> 14.5389, 1000, 36298.0359
> >>
> >> Maung
> >>
> >> On May 27, 2014, at 11:43 AM, Maung Than  wrote:
> >>
> >>> You meant to say the below parameter; if so we do not override it; so
> it
> >> is the default 10 sec. Should It be larger? It is happening only with
> Gzip;
> >> Snappy or no compression works fine.
> >>>
> >>> request.timeout.ms1   The amount of time the broker will wait
> >> trying to meet the request.required.acks requirement before sending
> back an
> >> error to the client.
> >>>
> >>> Thanks,
> >>> Mauing
> >>>
> >>> On May 27, 2014, at 9:29 AM, Neha Narkhede 
> >> wrote:
> >>>
>  I think Guozhang meant to say request.timeout.ms, not session
> timeout.
> >> You
>  can try increasing the request timeout through the request-timeout-ms
>  command line option.
> 
>  Thanks,
>  Neha
> 
> 
>  On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang 
> >> wrote:
> 
> > Maung,
> >
> > This issue may be due to the session timeout value set too small.
> With
> > batch size 1000 and message size 420 you are sending 420K of data in
> >> each
> > request. What is your time out value?
> >
> > Guozhang
> >
> >
> > On Mon, May 26, 2014 at 11:46 PM, Maung Than 
> >> wrote:
> >
> >> Hi All,
> >>
> >> This is what we are running on the Broker:
> >>
> >>
> >> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
> >> --broker-list vp21q12ic-hpaj020921:9092 --messages 1000 --topic
> >> imessage --threads 10 --message-size 420 --batch-size 1000
> >> --compression-codec 1
> >>
> >> We are getting the below exception for the test, it occurs only with
> >> gzip
> >> in an async mode. In the broker log, I saw Connection reset by peer
> at
> >> sun.nio.ch.FileDispatcher. exception. Any thoughts?
> >>
> >> [2014-05-26 22:49:33,361] WARN Failed to send producer request with
> >> correlation id 58510 to broker 3 with data for partitions
> [imessage,1]
> >> (kafka.producer.async.DefaultEventHandler)
> >> java.net.SocketTimeoutException
> >>  at
> >>
> >> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> >>  at
> >> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >>  at
> >>
> >
> >>
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> >>  at kafka.utils.Utils$.read(Utils.scala:375)
> >>  at
> >>
> >
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >>  at
> >> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>  at
> >>
> >
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >>  at
> > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >>  at
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> >>  at
> >>
> >
> >>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> >>  at
> >>
> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
> >>  at
> >>
> >
> >>
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
> >>  at
> >>
> >
> >>
> kafka.producer.S

Re: java.net.SocketTimeoutException in Broker

2014-05-28 Thread Maung Than
The new value is 25000 ms. 
We still have the same issue. 

Thanks,
Maung

On May 27, 2014, at 3:15 PM, Guozhang Wang  wrote:

> What is the new value you set? The new exception you saw seems not related
> to Kafka network issues, you may want to, for example, check
> 
> https://netbeans.org/bugzilla/show_bug.cgi?id=14
> 
> 
> 
> On Tue, May 27, 2014 at 12:34 PM, Maung Than  wrote:
> 
>> We are now getting additional IO exception as well:
>> 
>> May 27, 2014 12:32:29 PM sun.rmi.transport.tcp.TCPTransport$AcceptLoop
>> executeAcceptLoop
>> WARNING: RMI TCP Accept-0: accept loop for ServerSocket[addr=
>> 0.0.0.0/0.0.0.0,port=0,localport=50110] throws
>> java.io.IOException: The server sockets created using the
>> LocalRMIServerSocketFactory only accept connections from clients running on
>> the host where the RMI remote objects have been exported.
>>at
>> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:96)
>>at
>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:369)
>>at
>> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:341)
>>at java.lang.Thread.run(Thread.java:662)
>> 2014-05-27 12:28:50:309, 2014-05-27 12:33:25:806, 1, 420, 1000, 4005.43,
>> 14.5389, 1000, 36298.0359
>> 
>> Maung
>> 
>> On May 27, 2014, at 11:43 AM, Maung Than  wrote:
>> 
>>> You meant to say the below parameter; if so we do not override it; so it
>> is the default 10 sec. Should It be larger? It is happening only with Gzip;
>> Snappy or no compression works fine.
>>> 
>>> request.timeout.ms1   The amount of time the broker will wait
>> trying to meet the request.required.acks requirement before sending back an
>> error to the client.
>>> 
>>> Thanks,
>>> Mauing
>>> 
>>> On May 27, 2014, at 9:29 AM, Neha Narkhede 
>> wrote:
>>> 
 I think Guozhang meant to say request.timeout.ms, not session timeout.
>> You
 can try increasing the request timeout through the request-timeout-ms
 command line option.
 
 Thanks,
 Neha
 
 
 On Tue, May 27, 2014 at 8:55 AM, Guozhang Wang 
>> wrote:
 
> Maung,
> 
> This issue may be due to the session timeout value set too small. With
> batch size 1000 and message size 420 you are sending 420K of data in
>> each
> request. What is your time out value?
> 
> Guozhang
> 
> 
> On Mon, May 26, 2014 at 11:46 PM, Maung Than 
>> wrote:
> 
>> Hi All,
>> 
>> This is what we are running on the Broker:
>> 
>> 
>> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
>> --broker-list vp21q12ic-hpaj020921:9092 --messages 1000 --topic
>> imessage --threads 10 --message-size 420 --batch-size 1000
>> --compression-codec 1
>> 
>> We are getting the below exception for the test, it occurs only with
>> gzip
>> in an async mode. In the broker log, I saw Connection reset by peer at
>> sun.nio.ch.FileDispatcher. exception. Any thoughts?
>> 
>> [2014-05-26 22:49:33,361] WARN Failed to send producer request with
>> correlation id 58510 to broker 3 with data for partitions [imessage,1]
>> (kafka.producer.async.DefaultEventHandler)
>> java.net.SocketTimeoutException
>>  at
>> 
>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>>  at
>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>>  at
>> 
> 
>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>>  at kafka.utils.Utils$.read(Utils.scala:375)
>>  at
>> 
> 
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>  at
>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>  at
>> 
> 
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>  at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>  at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
>>  at
>> 
> 
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
>>  at
>> 
> 
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)
>>  at
>> 
> 
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>>  at
>> 
> 
>> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)
>>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>  at
>> 
> 
>> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)
>>  at
>> 
>> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)
>>  at
>> 
>> kafka.producer.SyncP

Producer Side Metric Details

2014-05-28 Thread Bhavesh Mistry
Hi Kafka Group,

I need to get following metrics from the producer side.

I am able to get following metric using the ProducerTopicMetrics class per
minute.
messageRate
byteRate
droppedMessageRate

I would like to know how to get above metric per topic per partition.
Also, how do I get count of number of exception occurred due to
kafka.common.MessageSizeTooLargeException, or Network
Down or message rejected due to Broker being down etc.Is there a JMX or
any other bean to get this metric counter ?

We are using Kafka version 0.8.0

Thanks,

Bhavesh


Re: Feedback for Kafka Web Console's Topic Feed

2014-05-28 Thread Claude Mamo
I will see if I can fit this in the next release Guozhang. But I don't
think this falls under the topic feed's scope. Currently the topic feed
displays the latest published partition messages, although it's a little
buggy ;-).

Claude


On Wed, May 28, 2014 at 6:47 PM, Guozhang Wang  wrote:

> Also does this also include request handler logs and the controller logs? I
> think it will be also useful to include these logs and arrange them in
> time-line orders.
>
> Guozhang
>
>
> On Wed, May 28, 2014 at 7:42 AM, Jun Rao  wrote:
>
> > Is Topic Feed supposed to include all logs related to a topic? One of the
> > things that could be useful is to also include the size of each
> > topic/partition (we added such a jmx in trunk).
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, May 28, 2014 at 12:35 AM, Claude Mamo 
> > wrote:
> >
> > > Hello all,
> > >
> > > I'm working on the next version of Kafka Web
> > > Consoleand I'm
> > > seeking feedback for the Topic
> > > Feed functionality<
> > >
> >
> https://raw.githubusercontent.com/claudemamo/kafka-web-console/master/img/topic-feed.png
> > > >.
> > > Have you found this feature useful? If no, why? What would you improve
> to
> > > make it more useful to you?
> > >
> > > Thanks,
> > >
> > > Claude
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Feedback for Kafka Web Console's Topic Feed

2014-05-28 Thread Claude Mamo
The size of topics and partitions will be included in the next release but
it will be separate from the topic feed.

Claude


On Wed, May 28, 2014 at 4:42 PM, Jun Rao  wrote:

> Is Topic Feed supposed to include all logs related to a topic? One of the
> things that could be useful is to also include the size of each
> topic/partition (we added such a jmx in trunk).
>
> Thanks,
>
> Jun
>
>
> On Wed, May 28, 2014 at 12:35 AM, Claude Mamo 
> wrote:
>
> > Hello all,
> >
> > I'm working on the next version of Kafka Web
> > Consoleand I'm
> > seeking feedback for the Topic
> > Feed functionality<
> >
> https://raw.githubusercontent.com/claudemamo/kafka-web-console/master/img/topic-feed.png
> > >.
> > Have you found this feature useful? If no, why? What would you improve to
> > make it more useful to you?
> >
> > Thanks,
> >
> > Claude
> >
>


Re: Question on output of kafka-producer-perf-test.sh

2014-05-28 Thread Maung Than

 The bytes-in-rate reported on the broker is the post-compression rate.

Based on that fact, I am trying to calculate the volume by multiplying duration 
and rate, i.e,  (end.time - start.time) X MB.sec , 
but it does not give me the total.data.sent.in.MB.I put the duration in the 
brackets below: 

Any thoughts? 


2014-05-27 14:10:37:347,2014-05-27 14:11:15:546, 0,  420,  1000,  2002.72,  
 52.4285,  500,  130893.4789  ( 34 sec )


2014-05-27 13:33:31:493,2014-05-27 13:34:11:862, 2,  420,  1000,  2002.72,  
 49.6102,  500,  123857.4153  ( 40 sec ) 

Thanks,
Maung


On May 28, 2014, at 7:37 AM, Jun Rao  wrote:

> The bytes-in-rate reported on the broker is the post-compression rate.
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, May 27, 2014 at 9:16 PM, Maung Than  wrote:
> 
>> Any idea when and which release that could be included in?
>> 
>> We would like to have it sooner and can we do something about it?
>> 
>> Thanks,
>> Maung
>> 
>> On May 27, 2014, at 5:10 PM, Guozhang Wang  wrote:
>> 
>>> Maung,
>>> 
>>> Yes, the throughput computed here is based on pre-compression bytes. In
>> the
>>> old producer we do not have a metric exposing the compression ratio, in
>> the
>>> new producer available in current trunk we can easily add that metric. I
>>> have just created a ticket for this a moment ago.
>>> 
>>> https://issues.apache.org/jira/browse/KAFKA-1472
>>> 
>>> Guozhang
>>> 
>>> 
>>> 
>>> On Tue, May 27, 2014 at 2:41 PM, Maung Than 
>> wrote:
>>> 
 
 Hi All,
 
 We are seeing total data sent below is the same with or without
 compression.
 Is it always raw data?
 If so is there a way we can get the compressed volume?
 
 start.time, end.time, compression, message.size, batch.size,
 total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
 [2014-05-27 14:10:37,471] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
 SLF4J: Defaulting to no-operation (NOP) logger implementation
 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further
 details.
 [2014-05-27 14:10:37,547] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 14:10:37,549] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 14:10:37,552] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 14:10:37,560] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 2014-05-27 14:10:37:347, 2014-05-27 14:11:15:546, 0, 420, 1000, 2002.72,
 52.4285, 500, 130893.4789
 
 
 -bash-4.1$
 /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
 --broker-list vp21q12ic-hpaj020921:9092 --messages 500 --topic
>> imessage
 --threads 5 --message-size 420 --batch-size 1000 --compression-codec 2
 start.time, end.time, compression, message.size, batch.size,
 total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
 [2014-05-27 13:33:31,616] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
 SLF4J: Defaulting to no-operation (NOP) logger implementation
 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>> further
 details.
 [2014-05-27 13:33:31,692] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 13:33:31,694] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 13:33:31,697] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 [2014-05-27 13:33:31,705] WARN Property reconnect.interval is not valid
 (kafka.utils.VerifiableProperties)
 2014-05-27 13:33:31:493, 2014-05-27 13:34:11:862, 2, 420, 1000, 2002.72,
 49.6102, 500, 123857.4153
 
 Thanks,
 Maung
 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>> 
>> 



Re: how to control assign policy for consumers

2014-05-28 Thread Guozhang Wang
With high-level consumers you cannot control the assignment of partitions
to consumers, and if partitions are located across all brokers each
consumer is likely to connect to each of the brokers.

One (little hacky) thing you can do is to manually assign the partitions to
brokers such that each broker i of topic t is in broker i % 3, the for
consumer fetching partition i of topic t, it will only connect to broker i
% 3. By default the assignment is not aligned hence you will likely to have
TCP between each pair of consumer/broker.

Guozhang


On Wed, May 28, 2014 at 8:15 AM, Рябков Алексей Николаевич <
a.ryab...@ntc-vulkan.ru> wrote:

> How can I tell consumer to connect  to one broker  ...For example:
>  -  I have 3 topic and 3 broker...also each topic have 6 partitions...
>   - I than start 6 consumers to listen this 3 topics...And in such case we
> can found from 6 (each consumer connect only to one broker) to 18 (each
> consumer connect to every broker) TCP connection  How I can minimize
> network connections?
>
> With best regards, Aleksey Ryabkov
>
>


-- 
-- Guozhang


Re: mBean to monitor message per partitions in topic

2014-05-28 Thread Guozhang Wang
You can take a look at the consumer offset checker tool, which reads the
consumed offsets in ZK and the data size in broker to compute the lag. You
can re-use that logic to implement the partitioner.

Guozhang


On Wed, May 28, 2014 at 8:13 AM, Рябков Алексей Николаевич <
a.ryab...@ntc-vulkan.ru> wrote:

> Hello!
>
> How can I get information about unfetched message per partition in topic?
>I wish to use such information to create my custom
> partitioner.class to balance messages between partitions
>
> With best regards, Aleksey Ryabkov
>
>


-- 
-- Guozhang


Re: Feedback for Kafka Web Console's Topic Feed

2014-05-28 Thread Guozhang Wang
Also does this also include request handler logs and the controller logs? I
think it will be also useful to include these logs and arrange them in
time-line orders.

Guozhang


On Wed, May 28, 2014 at 7:42 AM, Jun Rao  wrote:

> Is Topic Feed supposed to include all logs related to a topic? One of the
> things that could be useful is to also include the size of each
> topic/partition (we added such a jmx in trunk).
>
> Thanks,
>
> Jun
>
>
> On Wed, May 28, 2014 at 12:35 AM, Claude Mamo 
> wrote:
>
> > Hello all,
> >
> > I'm working on the next version of Kafka Web
> > Consoleand I'm
> > seeking feedback for the Topic
> > Feed functionality<
> >
> https://raw.githubusercontent.com/claudemamo/kafka-web-console/master/img/topic-feed.png
> > >.
> > Have you found this feature useful? If no, why? What would you improve to
> > make it more useful to you?
> >
> > Thanks,
> >
> > Claude
> >
>



-- 
-- Guozhang


Re: running on scala 2.11

2014-05-28 Thread Guozhang Wang
Hello Laszlo,

Have you built Kafka with scala 2.11? You may read the README file to check
compiling Kafka with different scala versions.

Guozhang


On Wed, May 28, 2014 at 5:45 AM, Laszlo Fogas 
wrote:

> Hello folks,
>
> anybody running kafka with scala 2.11.0?
>
> KAFKA-1454 says it's possible.. i'm having problems though when running the
> basic producer example from the wiki
>
> The message is *NoClassDefFoundError:
> scala/collection/GenTraversableOnce$class*
>
>
> Thanks
>
> Laszlo
>



-- 
-- Guozhang


how to control assign policy for consumers

2014-05-28 Thread Рябков Алексей Николаевич
How can I tell consumer to connect  to one broker  ...For example:
 -  I have 3 topic and 3 broker...also each topic have 6 partitions...
  - I than start 6 consumers to listen this 3 topics...And in such case we can 
found from 6 (each consumer connect only to one broker) to 18 (each consumer 
connect to every broker) TCP connection  How I can minimize network 
connections?

With best regards, Aleksey Ryabkov



mBean to monitor message per partitions in topic

2014-05-28 Thread Рябков Алексей Николаевич
Hello!

How can I get information about unfetched message per partition in topic?
   I wish to use such information to create my custom partitioner.class to 
balance messages between partitions

With best regards, Aleksey Ryabkov



Re: Exception in thread "main" java.lang.UnsupportedOperationException: empty.head

2014-05-28 Thread Jun Rao
Any error from the controller/state-change log?

Thanks,

Jun


On Wed, May 28, 2014 at 1:50 AM, Hemath Kumar wrote:

> even if i specify the topic its showing the same error.
>
> /opt/kafka_2.10-0.8.1.1/bin/kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker -zkconnect localhost:2181 --group
> group_name --topic testtopic
>
>
> On Wed, May 28, 2014 at 10:42 AM, Guozhang Wang 
> wrote:
>
> > Hemath,
> >
> > I will need more info to check the reasons kafka stopped working. For
> > consumer offset checker tool, you need to specify the topic names,
> > otherwise you will see "empty.head".
> >
> > Guozhang
> >
> >
> > On Tue, May 27, 2014 at 9:53 PM, Hemath Kumar 
> > wrote:
> >
> > > This is the command i have used.
> > >
> > > ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
> > > localhost:2181 --group  consumer_group
> > >
> > >
> > > Not just only error with this command. when this sitution occured
> > messages
> > > are not able to produce and consumer any more.
> > >
> > >
> > > On Tue, May 27, 2014 at 9:22 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > Could you print the full command line you used consumer offset
> checker
> > > > tool?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, May 27, 2014 at 12:46 AM, Hemath Kumar <
> hksrckmur...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Every time when i reboot my machine kafka is not working. when i
> > check
> > > > the
> > > > > consumeroffset tool i am getting this error.  I am new to this i
> > could
> > > > not
> > > > > figure out what could be the issues.
> > > > >
> > > > > I am saving the logs in /data/zk and /data/kafka. And its running
> on
> > > > single
> > > > > node. Would appriciate if any one could help me in this regard.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Exception in thread "main" java.lang.UnsupportedOperationException:
> > > > > empty.head
> > > > > at scala.collection.immutable.Vector.head(Vector.scala:192)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > > > > at
> > > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > > at scala.collection.immutable.List.foreach(List.scala:318)
> > > > > at
> > > > >
> > >
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
> > > > > at
> > > > kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: Mbean "kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec" returning 0 always

2014-05-28 Thread Jun Rao
Only leaders report the message-in-rate metrics. For balancing the leaders,
take a look at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whypartitionleadersmigratethemselvessometimes
?

Thanks,

Jun


On Wed, May 28, 2014 at 1:05 AM, Arjun  wrote:

> Hi,
>
> I just checked the list topic. There is data in the kafka logs directory.
> My set up as said has 3 brokers, with replication factor 2.
> One thing i see in the output of the list topic is, no partition has this
> node as a leader. Is this the reason for bean not showing up anything?
>
> Thanks
> Arjun Narasimha kota
>
>
>
> On Monday 26 May 2014 06:09 PM, Devanshu Srivastava wrote:
>
>> Hi Arjun ,
>>
>> What is the configured number of logical partitions per topic per server .
>> With the partition size as 2 , Only two partitions would be created
>> distributed over 2 brokers with one partition each , you can confirm  the
>> same by checking the log file directory for each broker (default -
>> /tmp/kafka-logs). Probably the 3d broker does not have any Topic-partition
>> for data to be written/read.
>>
>> You may also use tool : bin/kafka-list-topic.sh –zookeeper
>> localhost:2181 –topic
>> {topic-name}
>>
>>
>> On Mon, May 26, 2014 at 4:02 PM, Arjun  wrote:
>>
>>  Hi,
>>>
>>> We have a Kafka cluster which contains 3 kafka brokers. Data is pushed
>>> into kafka queue continously.
>>> To monitor the kafka brokers, we use the JMX metrics given by kafka.
>>> One thing i observed is there is one broker which is returning*"0"*  all
>>> the time for the bean
>>> *"kafka.server":type="BrokerTopicMetrics",name="
>>> AllTopicsMessagesInPerSec"
>>> .
>>> *
>>> Is there some thing i should worry about? i am able to get the data from
>>> kafka and push the data into kafka with out a glitch.
>>>
>>>
>>> Thanks
>>> Arjun Narasimha Kota
>>> *
>>> *
>>>
>>>
>>>
>


Re: Feedback for Kafka Web Console's Topic Feed

2014-05-28 Thread Jun Rao
Is Topic Feed supposed to include all logs related to a topic? One of the
things that could be useful is to also include the size of each
topic/partition (we added such a jmx in trunk).

Thanks,

Jun


On Wed, May 28, 2014 at 12:35 AM, Claude Mamo  wrote:

> Hello all,
>
> I'm working on the next version of Kafka Web
> Consoleand I'm
> seeking feedback for the Topic
> Feed functionality<
> https://raw.githubusercontent.com/claudemamo/kafka-web-console/master/img/topic-feed.png
> >.
> Have you found this feature useful? If no, why? What would you improve to
> make it more useful to you?
>
> Thanks,
>
> Claude
>


Re: Question on output of kafka-producer-perf-test.sh

2014-05-28 Thread Jun Rao
The bytes-in-rate reported on the broker is the post-compression rate.

Thanks,

Jun


On Tue, May 27, 2014 at 9:16 PM, Maung Than  wrote:

> Any idea when and which release that could be included in?
>
> We would like to have it sooner and can we do something about it?
>
> Thanks,
> Maung
>
> On May 27, 2014, at 5:10 PM, Guozhang Wang  wrote:
>
> > Maung,
> >
> > Yes, the throughput computed here is based on pre-compression bytes. In
> the
> > old producer we do not have a metric exposing the compression ratio, in
> the
> > new producer available in current trunk we can easily add that metric. I
> > have just created a ticket for this a moment ago.
> >
> > https://issues.apache.org/jira/browse/KAFKA-1472
> >
> > Guozhang
> >
> >
> >
> > On Tue, May 27, 2014 at 2:41 PM, Maung Than 
> wrote:
> >
> >>
> >> Hi All,
> >>
> >> We are seeing total data sent below is the same with or without
> >> compression.
> >> Is it always raw data?
> >> If so is there a way we can get the compressed volume?
> >>
> >> start.time, end.time, compression, message.size, batch.size,
> >> total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
> >> [2014-05-27 14:10:37,471] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >> SLF4J: Defaulting to no-operation (NOP) logger implementation
> >> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> >> details.
> >> [2014-05-27 14:10:37,547] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 14:10:37,549] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 14:10:37,552] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 14:10:37,560] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> 2014-05-27 14:10:37:347, 2014-05-27 14:11:15:546, 0, 420, 1000, 2002.72,
> >> 52.4285, 500, 130893.4789
> >>
> >>
> >> -bash-4.1$
> >> /Users/worun/kafkabuild/kafka-0.8.1-src/bin/kafka-producer-perf-test.sh
> >> --broker-list vp21q12ic-hpaj020921:9092 --messages 500 --topic
> imessage
> >> --threads 5 --message-size 420 --batch-size 1000 --compression-codec 2
> >> start.time, end.time, compression, message.size, batch.size,
> >> total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
> >> [2014-05-27 13:33:31,616] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> >> SLF4J: Defaulting to no-operation (NOP) logger implementation
> >> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> >> details.
> >> [2014-05-27 13:33:31,692] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 13:33:31,694] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 13:33:31,697] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> [2014-05-27 13:33:31,705] WARN Property reconnect.interval is not valid
> >> (kafka.utils.VerifiableProperties)
> >> 2014-05-27 13:33:31:493, 2014-05-27 13:34:11:862, 2, 420, 1000, 2002.72,
> >> 49.6102, 500, 123857.4153
> >>
> >> Thanks,
> >> Maung
> >>
> >
> >
> >
> > --
> > -- Guozhang
>
>


Kafka 1+ year Opportunity

2014-05-28 Thread Frino, Ashley
Hello,

I hope all is well . I am reaching out to you in regards to a Kafka Opportunity 
we currently have open. I wanted to reach out to introduce myself and to see if 
you or anyone you may know would be interested in a 1 year+ contract 
opportunity. My client is going to be implementing Kafka into their Hadoop and 
Netezza environment and are looking for someone who can be the SME for the 
implementation of Kafka. Our client corporation is one of the nation's leading 
media and entertainment companies.  Its assets include cable television 
operations that provide industry leading services to more than 3 million New 
York area households.  I would be very interested in speaking with you in more 
detail about your current work situation. If you are not interested but might 
know someone who is please feel free to forward my contact information.

I look forward to hearing back from you.

Regards, Ashley



Ashley Frino | Sr. Applications Recruiter
T 631.760.3116 | F 631.760.3190 | afr...@teksystems.com
290 Broadhollow Road, Suite 310 E, Melville, NY 11747



[cid:image003.gif@01CF7A5A.4F5C7B60]



TEKsystems is recognized as a top workplace. Learn 
why.



[cid:image004.gif@01CF7A5A.4F5C7B60]



The greatest compliment to our business is a referral. If you know of someone 
within the IT and/or Telecom industry who is looking for work, or who is 
hiring, TEKsystems can be of assistance.  Please feel free to pass on our 
contact information.  Thank you!



This electronic mail (including any attachments) may contain information that 
is privileged, confidential, and/or otherwise protected from disclosure to 
anyone other than its intended recipient(s). Any dissemination or use of this 
electronic mail or its contents (including any attachments) by persons other 
than the intended recipient(s) is strictly prohibited. If you have received 
this message in error, please notify us immediately by reply e-mail so that we 
may correct our internal records. Please then delete the original message 
(including any attachments) in its entirety. Thank you.


running on scala 2.11

2014-05-28 Thread Laszlo Fogas
Hello folks,

anybody running kafka with scala 2.11.0?

KAFKA-1454 says it's possible.. i'm having problems though when running the
basic producer example from the wiki

The message is *NoClassDefFoundError:
scala/collection/GenTraversableOnce$class*


Thanks

Laszlo


Re: Exception in thread "main" java.lang.UnsupportedOperationException: empty.head

2014-05-28 Thread Hemath Kumar
even if i specify the topic its showing the same error.

/opt/kafka_2.10-0.8.1.1/bin/kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker -zkconnect localhost:2181 --group
group_name --topic testtopic


On Wed, May 28, 2014 at 10:42 AM, Guozhang Wang  wrote:

> Hemath,
>
> I will need more info to check the reasons kafka stopped working. For
> consumer offset checker tool, you need to specify the topic names,
> otherwise you will see "empty.head".
>
> Guozhang
>
>
> On Tue, May 27, 2014 at 9:53 PM, Hemath Kumar 
> wrote:
>
> > This is the command i have used.
> >
> > ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
> > localhost:2181 --group  consumer_group
> >
> >
> > Not just only error with this command. when this sitution occured
> messages
> > are not able to produce and consumer any more.
> >
> >
> > On Tue, May 27, 2014 at 9:22 PM, Guozhang Wang 
> wrote:
> >
> > > Hi,
> > >
> > > Could you print the full command line you used consumer offset checker
> > > tool?
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, May 27, 2014 at 12:46 AM, Hemath Kumar  >
> > > wrote:
> > >
> > > > Every time when i reboot my machine kafka is not working. when i
> check
> > > the
> > > > consumeroffset tool i am getting this error.  I am new to this i
> could
> > > not
> > > > figure out what could be the issues.
> > > >
> > > > I am saving the logs in /data/zk and /data/kafka. And its running on
> > > single
> > > > node. Would appriciate if any one could help me in this regard.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Exception in thread "main" java.lang.UnsupportedOperationException:
> > > > empty.head
> > > > at scala.collection.immutable.Vector.head(Vector.scala:192)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > at
> > > >
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > > > at
> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > at scala.collection.immutable.List.foreach(List.scala:318)
> > > > at
> > > >
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
> > > > at
> > > kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Mbean "kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec" returning 0 always

2014-05-28 Thread Arjun

Hi,

I just checked the list topic. There is data in the kafka logs directory.
My set up as said has 3 brokers, with replication factor 2.
One thing i see in the output of the list topic is, no partition has 
this node as a leader. Is this the reason for bean not showing up anything?


Thanks
Arjun Narasimha kota


On Monday 26 May 2014 06:09 PM, Devanshu Srivastava wrote:

Hi Arjun ,

What is the configured number of logical partitions per topic per server .
With the partition size as 2 , Only two partitions would be created
distributed over 2 brokers with one partition each , you can confirm  the
same by checking the log file directory for each broker (default -
/tmp/kafka-logs). Probably the 3d broker does not have any Topic-partition
for data to be written/read.

You may also use tool : bin/kafka-list-topic.sh –zookeeper
localhost:2181 –topic
{topic-name}


On Mon, May 26, 2014 at 4:02 PM, Arjun  wrote:


Hi,

We have a Kafka cluster which contains 3 kafka brokers. Data is pushed
into kafka queue continously.
To monitor the kafka brokers, we use the JMX metrics given by kafka.
One thing i observed is there is one broker which is returning*"0"*  all
the time for the bean
*"kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec"
.
*
Is there some thing i should worry about? i am able to get the data from
kafka and push the data into kafka with out a glitch.


Thanks
Arjun Narasimha Kota
*
*






Feedback for Kafka Web Console's Topic Feed

2014-05-28 Thread Claude Mamo
Hello all,

I'm working on the next version of Kafka Web
Consoleand I'm
seeking feedback for the Topic
Feed 
functionality.
Have you found this feature useful? If no, why? What would you improve to
make it more useful to you?

Thanks,

Claude