Re: LeaderNotAvailableException

2014-08-13 Thread Ryan Williams
Ok, will do. Still with .8.1 on the same instance, after being reset has
been running for about 48 hours now without a recurrence yet.


On Wed, Aug 13, 2014 at 10:20 PM, Neha Narkhede 
wrote:

> Due to KAFKA-1393, the server probably never ended up completely creating
> the replicas. Let us know how 0.8.1.1 goes.
>
> Thanks,
> Neha
>
>
> On Tue, Aug 12, 2014 at 10:12 AM, Ryan Williams 
> wrote:
>
> > Using version 0.8.1.
> >
> > Looking to update to 0.8.1.1 now probably.
> >
> >
> > On Tue, Aug 12, 2014 at 9:25 AM, Guozhang Wang 
> wrote:
> >
> > > The "0" there in the kafka-topics output is the broker id.
> > >
> > > From the broker log I think you are hitting KAFKA-1393
> > > , which Kafka
> version
> > > are
> > > you using?
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Aug 11, 2014 at 10:37 PM, Ryan Williams 
> > > wrote:
> > >
> > > > Thanks for the heads up on attachments, here's a gist:
> > > >
> > > >
> > > >
> > >
> >
> https://gist.githubusercontent.com/ryanwi/84deb8774a6922ff3704/raw/75c33ad71d0d41301533cbc645fa9846736d5eb0/gistfile1.txt
> > > >
> > > > This seems to mostly happen in my development environment, when
> > running a
> > > > single broker. I don't see any broker failure in the controller log.
> > > > Anything else to look for with the topics reporting 0 replicas?
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Aug 11, 2014 at 9:31 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Ryan,
> > > > >
> > > > > Apache mailing list does not allow attachments exceeding a certain
> > size
> > > > > limit, so the server logs is blocked.
> > > > >
> > > > > From the controller log it seems this only broker has failed and
> > hence
> > > no
> > > > > partitions will be available. This could be a soft failure (e.g.
> long
> > > > GC),
> > > > > or the ZK server side issues. You may want to take a look at your
> > > > > controller log to see if there is any entries like "broker failure"
> > > > before
> > > > > the offline leader selection process.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Aug 11, 2014 at 5:08 PM, Ryan Williams <
> rwilli...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > The broker appears to be running
> > > > > >
> > > > > > $ telnet kafka-server 9092
> > > > > > Trying...
> > > > > > Connected to kafka-server
> > > > > > Escape character is '^]'.
> > > > > >
> > > > > > I've attached today's server.log.  There was a manual restart of
> > > kafka,
> > > > > > which you'll notice, but that didn't fix the issue.
> > > > > >
> > > > > > Thanks for looking!
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Aug 11, 2014 at 4:30 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > >> Hi Ryan,
> > > > > >>
> > > > > >> Could you check if all of your brokers are still live and
> running?
> > > > Also
> > > > > >> could you check the server log in addition to the producer /
> > > > > state-change
> > > > > >> /
> > > > > >> controller logs?
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Aug 11, 2014 at 12:45 PM, Ryan Williams <
> > > rwilli...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > I have a single broker test Kafka instance that was running
> fine
> > > on
> > > > > >> Friday
> > > > > >> > (basically out of the box configuration with 2 partitions),
> now
> > I
> > > > come
> > > > > >> back
> > > > > >> > on Monday and producers are unable to send messages.
> > > > > >> >
> > > > > >> > What else can i look at to debug, and prevent?
> > > > > >> >
> > > > > >> > I know how to recover by removing data directories for kafka
> and
> > > > > >> zookeeper
> > > > > >> > to start fresh.  But, this isn't the first time this has
> > happened,
> > > > so
> > > > > I
> > > > > >> > would like to understand it better to feel more comfortable
> with
> > > > > kafka.
> > > > > >> >
> > > > > >> >
> > > > > >> > ===
> > > > > >> > Producer error (from console produce)
> > > > > >> > ===
> > > > > >> > [2014-08-11 19:32:49,781] WARN Error while fetching metadata
> > > > > >> > [{TopicMetadata for topic mytopic ->
> > > > > >> > No partition metadata for topic mytopic due to
> > > > > >> > kafka.common.LeaderNotAvailableException}] for topic
> [mytopic]:
> > > > class
> > > > > >> > kafka.common.LeaderNotAvailableException
> > > > > >> > (kafka.producer.BrokerPartitionInfo)
> > > > > >> > [2014-08-11 19:32:49,782] ERROR Failed to collate messages by
> > > topic,
> > > > > >> > partition due to: Failed to fetch topic metadata for topic:
> > > mytopic
> > > > > >> > (kafka.producer.async.DefaultEventHandler)
> > > > > >> >
> > > > > >> > ===
> > > > > >> > state-change.log
> > > > > >> > ===
> > > > > >> > [2014-08-11 19:12:45,312] TRACE Controller 0 epoch 3 started
> > > leader
> > > > > >> > elect

Re: LeaderNotAvailableException

2014-08-13 Thread Neha Narkhede
Due to KAFKA-1393, the server probably never ended up completely creating
the replicas. Let us know how 0.8.1.1 goes.

Thanks,
Neha


On Tue, Aug 12, 2014 at 10:12 AM, Ryan Williams  wrote:

> Using version 0.8.1.
>
> Looking to update to 0.8.1.1 now probably.
>
>
> On Tue, Aug 12, 2014 at 9:25 AM, Guozhang Wang  wrote:
>
> > The "0" there in the kafka-topics output is the broker id.
> >
> > From the broker log I think you are hitting KAFKA-1393
> > , which Kafka version
> > are
> > you using?
> >
> > Guozhang
> >
> >
> > On Mon, Aug 11, 2014 at 10:37 PM, Ryan Williams 
> > wrote:
> >
> > > Thanks for the heads up on attachments, here's a gist:
> > >
> > >
> > >
> >
> https://gist.githubusercontent.com/ryanwi/84deb8774a6922ff3704/raw/75c33ad71d0d41301533cbc645fa9846736d5eb0/gistfile1.txt
> > >
> > > This seems to mostly happen in my development environment, when
> running a
> > > single broker. I don't see any broker failure in the controller log.
> > > Anything else to look for with the topics reporting 0 replicas?
> > >
> > >
> > >
> > >
> > > On Mon, Aug 11, 2014 at 9:31 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Ryan,
> > > >
> > > > Apache mailing list does not allow attachments exceeding a certain
> size
> > > > limit, so the server logs is blocked.
> > > >
> > > > From the controller log it seems this only broker has failed and
> hence
> > no
> > > > partitions will be available. This could be a soft failure (e.g. long
> > > GC),
> > > > or the ZK server side issues. You may want to take a look at your
> > > > controller log to see if there is any entries like "broker failure"
> > > before
> > > > the offline leader selection process.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Aug 11, 2014 at 5:08 PM, Ryan Williams 
> > > > wrote:
> > > >
> > > > > The broker appears to be running
> > > > >
> > > > > $ telnet kafka-server 9092
> > > > > Trying...
> > > > > Connected to kafka-server
> > > > > Escape character is '^]'.
> > > > >
> > > > > I've attached today's server.log.  There was a manual restart of
> > kafka,
> > > > > which you'll notice, but that didn't fix the issue.
> > > > >
> > > > > Thanks for looking!
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Aug 11, 2014 at 4:30 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > >> Hi Ryan,
> > > > >>
> > > > >> Could you check if all of your brokers are still live and running?
> > > Also
> > > > >> could you check the server log in addition to the producer /
> > > > state-change
> > > > >> /
> > > > >> controller logs?
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Mon, Aug 11, 2014 at 12:45 PM, Ryan Williams <
> > rwilli...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > I have a single broker test Kafka instance that was running fine
> > on
> > > > >> Friday
> > > > >> > (basically out of the box configuration with 2 partitions), now
> I
> > > come
> > > > >> back
> > > > >> > on Monday and producers are unable to send messages.
> > > > >> >
> > > > >> > What else can i look at to debug, and prevent?
> > > > >> >
> > > > >> > I know how to recover by removing data directories for kafka and
> > > > >> zookeeper
> > > > >> > to start fresh.  But, this isn't the first time this has
> happened,
> > > so
> > > > I
> > > > >> > would like to understand it better to feel more comfortable with
> > > > kafka.
> > > > >> >
> > > > >> >
> > > > >> > ===
> > > > >> > Producer error (from console produce)
> > > > >> > ===
> > > > >> > [2014-08-11 19:32:49,781] WARN Error while fetching metadata
> > > > >> > [{TopicMetadata for topic mytopic ->
> > > > >> > No partition metadata for topic mytopic due to
> > > > >> > kafka.common.LeaderNotAvailableException}] for topic [mytopic]:
> > > class
> > > > >> > kafka.common.LeaderNotAvailableException
> > > > >> > (kafka.producer.BrokerPartitionInfo)
> > > > >> > [2014-08-11 19:32:49,782] ERROR Failed to collate messages by
> > topic,
> > > > >> > partition due to: Failed to fetch topic metadata for topic:
> > mytopic
> > > > >> > (kafka.producer.async.DefaultEventHandler)
> > > > >> >
> > > > >> > ===
> > > > >> > state-change.log
> > > > >> > ===
> > > > >> > [2014-08-11 19:12:45,312] TRACE Controller 0 epoch 3 started
> > leader
> > > > >> > election for partition [mytopic,0] (state.change.logger)
> > > > >> > [2014-08-11 19:12:45,321] ERROR Controller 0 epoch 3 initiated
> > state
> > > > >> change
> > > > >> > for partition [mytopic,0] from OfflinePartition to
> OnlinePartition
> > > > >> failed
> > > > >> > (state.change.logger)
> > > > >> > kafka.common.NoReplicaOnlineException: No replica for partition
> > > > >> [mytopic,0]
> > > > >> > is alive. Live brokers are: [Set()], Assigned replicas are:
> > > [List(0)]
> > > > >> > at
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> kafka.contr

RE: Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount

2014-08-13 Thread Jagbir Hooda
Hi Jun,
The parser is being used by
kafka/core/src/main/scala/kafka/consumer/TopicCount.scala:56
As per your suggestion I've filed the JIRA 
https://issues.apache.org/jira/browse/KAFKA-1595
Thanks for looking into it.
jsh

> Date: Wed, 13 Aug 2014 08:22:22 -0700
> Subject: Re: Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount
> From: jun...@gmail.com
> To: users@kafka.apache.org
> 
> Are you using Scala JSON in your consumer application?
> 
> Yes, we probably need to switch off Scala JSON since it's being deprecated.
> Could you file a jira and put the link there?
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Aug 12, 2014 at 11:14 PM, Jagbir Hooda  wrote:
> 
> > > Date: Tue, 12 Aug 2014 16:35:35 -0700
> > > Subject: Re: Blocking Recursive parsing from
> > kafka.consumer.TopicCount$.constructTopicCount
> > > From: wangg...@gmail.com
> > > To: users@kafka.apache.org
> > >
> > > Hi Jagbir,
> > >
> > > The thread dump you uploaded is not readable, could you re-parse it and
> > > upload again?
> > >
> > > Gouging
> > Hi Guozhang,
> > I'm sorry that that the email got garbled up. Below is another attempt.
> > The first dump is for a recursive blocking thread holding the lock for
> > 0xd3a7e1d0and the subsequent dump is for a waiting thread.
> > (Please grep for 0xd3a7e1d0 to see the locked object.)
> > -8<-"Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"prio=10
> > tid=0x7f24dc285800 nid=0xda9 runnable
> > [0x7f249e40b000]java.lang.Thread.State: RUNNABLEat
> > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)at
> > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)at
> > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)at
> > scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> > scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> > scala.util.parsing.combinator.Parsers$Parser

Re: Strange topic-corruption issue?

2014-08-13 Thread Steve Miller
   Sure.  I ran:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
.log --deep-iteration

and got (in addition to the same non-secutive offsets error):

[ ... ]

offset: 1320 position: 344293 isvalid: true payloadsize: 208 magic: 0 
compresscodec: NoCompressionCodec crc: 1038804751
offset: 1321 position: 344527 isvalid: true payloadsize: 194 magic: 0 
compresscodec: NoCompressionCodec crc: 1211626571
offset: 1322 position: 344747 isvalid: true payloadsize: 195 magic: 0 
compresscodec: NoCompressionCodec crc: 228214666
offset: 1323 position: 344968 isvalid: true payloadsize: 285 magic: 0 
compresscodec: NoCompressionCodec crc: 2412118642
offset: 1324 position: 345279 isvalid: true payloadsize: 267 magic: 0 
compresscodec: NoCompressionCodec crc: 814469229
offset: 1325 position: 345572 isvalid: true payloadsize: 267 magic: 0 
compresscodec: NoCompressionCodec crc: 874964779
offset: 1326 position: 345865 isvalid: true payloadsize: 143 magic: 0 
compresscodec: NoCompressionCodec crc: 144834
offset: 1327 position: 346034 isvalid: true payloadsize: 161 magic: 0 
compresscodec: NoCompressionCodec crc: 3486482767
offset: 1327 position: 346221 isvalid: true payloadsize: 194 magic: 0 
compresscodec: NoCompressionCodec crc: 3322604516
offset: 1328 position: 346441 isvalid: true payloadsize: 207 magic: 0 
compresscodec: NoCompressionCodec crc: 3181460980
offset: 1329 position: 346674 isvalid: true payloadsize: 164 magic: 0 
compresscodec: NoCompressionCodec crc: 77979807
offset: 1330 position: 346864 isvalid: true payloadsize: 208 magic: 0 
compresscodec: NoCompressionCodec crc: 3051442612
offset: 1331 position: 347098 isvalid: true payloadsize: 196 magic: 0 
compresscodec: NoCompressionCodec crc: 1906163219
offset: 1332 position: 347320 isvalid: true payloadsize: 196 magic: 0 
compresscodec: NoCompressionCodec crc: 3849763639
offset: 1333 position: 347542 isvalid: true payloadsize: 207 magic: 0 
compresscodec: NoCompressionCodec crc: 3724257965
offset: 1334 position: 347775 isvalid: true payloadsize: 194 magic: 0 
compresscodec: NoCompressionCodec crc: 510173020
offset: 1335 position: 347995 isvalid: true payloadsize: 357 magic: 0 
compresscodec: NoCompressionCodec crc: 2043065154
offset: 1336 position: 348378 isvalid: true payloadsize: 195 magic: 0 
compresscodec: NoCompressionCodec crc: 435251578
offset: 1337 position: 348599 isvalid: true payloadsize: 169 magic: 0 
compresscodec: NoCompressionCodec crc: 1172187172
offset: 1338 position: 348794 isvalid: true payloadsize: 312 magic: 0 
compresscodec: NoCompressionCodec crc: 1324582122
offset: 1339 position: 349132 isvalid: true payloadsize: 196 magic: 0 
compresscodec: NoCompressionCodec crc: 3649742340
offset: 1340 position: 349354 isvalid: true payloadsize: 288 magic: 0 
compresscodec: NoCompressionCodec crc: 581177172

(etc.)

I also ran:

 /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
.index --deep-iteration

At first, I got the following:

Dumping .index
offset: 16 position: 4342
offset: 32 position: 8555
offset: 48 position: 12676
offset: 63 position: 16824
offset: 79 position: 21256
offset: 96 position: 25599
offset: 112 position: 29740
offset: 126 position: 33981
offset: 143 position: 38122
offset: 160 position: 42364
offset: 176 position: 46589
offset: 192 position: 50755
offset: 208 position: 54969
offset: 223 position: 59207
offset: 239 position: 63317
offset: 255 position: 67547
offset: 272 position: 71771
offset: 289 position: 76012
offset: 306 position: 80476
offset: 323 position: 84602
offset: 337 position: 88876
offset: 354 position: 93153
offset: 371 position: 97329
offset: 387 position: 101496
offset: 403 position: 105657
offset: 419 position: 109848
offset: 434 position: 113950
offset: 451 position: 118223
offset: 465 position: 122366
offset: 482 position: 126463
offset: 499 position: 130707
offset: 517 position: 135044
offset: 533 position: 139505
offset: 549 position: 143637
offset: 566 position: 147916
offset: 582 position: 152223
offset: 599 position: 156528
offset: 613 position: 160694
offset: 629 position: 164807
offset: 644 position: 169020
offset: 662 position: 173449
offset: 679 position: 177721
offset: 695 position: 182003
offset: 711 position: 186374
offset: 728 position: 190644
offset: 746 position: 195036
offset: 762 position: 199231
offset: 778 position: 203581
offset: 794 position: 208024
offset: 810 position: 212192
offset: 825 position: 216446
offset: 841 position: 220564
offset: 858 position: 224718
offset: 875 position: 228823
offset: 890 position: 232983
offset: 907 position: 237116
offset: 920 position: 241229
offset: 936 position: 245504
offset: 951 position: 249601
offset: 969 position: 253908
offset: 986 position: 258074
offset: 1002 position: 262228
offset: 1018 position: 266385
offset: 1035 position: 270699
offset: 1051 position: 274843
offset: 1067 position: 278954
offset: 1085 position: 283283
o

Re: Using the kafka dissector in wireshark/tshark 1.12

2014-08-13 Thread Neha Narkhede
Thanks for sharing this, Steve!


On Tue, Aug 12, 2014 at 11:03 AM, Steve Miller 
wrote:

>I'd seen references to there being a Kafka protocol dissector built
> into wireshark/tshark 1.12, but what I could find on that was a bit light
> on the specifics as to how to get it to do anything -- at least for someone
> (like me) who might use tcpdump a lot but who doesn't use tshark a lot.
>
>I got this working, so I figured I'd post a few pointers here on the
> off-chance that they save someone else a bit of time.
>
>Note that I'm using tshark, not wireshark; this might be easier and/or
> different in wireshark, but I don't feel like moving many gigabytes of data
> to a place where I can use wireshark. (-:
>
>If you're reading traffic live, you'll want to do something like this:
>
> tshark -V -i eth1 -o 'kafka.tcp.port:9092' -d tcp.port=9092,kafka
> -f 'dst port 9092' -Y (kafka options)
>
>For example, if you want to see output only for ProduceRequest and
> ProduceResponses, and only for the topic "mytopic", you can do:
>
> tshark -V -i eth1 -o 'kafka.tcp.port:9092' -d tcp.port=9092,kafka
> -f 'dst port 9092' -Y 'kafka.topic_name==mytopic && kafka.request_key==0'
>
>You can get a complete list of Kafka-related fields by doing:
>
> tshark -G fields | grep -i kafka
>
>There is a very significant downside to processing packets live: tshark
> uses dumpcap to generate the actual packets, and unless I'm missing some
> obscure tshark option (which is possible!) it won't toss old data.  So if
> you run this for a few hours, you'll end up with a ginormous file.
>
>By default (under Linux, at least) tshark is going to put that file in
> /tmp, so if your /tmp is small and/or a tmpfs that can make things a little
> exciting.  You can get around that by doing:
>
> (export TMPDIR=/big/damn/filesystem ; tshark bla bla bla)
>
> which I figure given typical Kafka data volumes is probably pretty
> important to know, and which doesn't seem to be documented in the tshark
> man pages.  It is at least not all that hard to search for.
>
>In theory, you can use the tshark "-b" option to specify a ring buffer
> of files, even for real-time processing, though:
>
> * adding -b anything (e.g., "-b files:1 -b filesize:1024") seems
> to want to force you to use -w (filename)
>
> * just adding -b and -w to the invocation above gets a warning
> about display filters not being supported when capturing and saving packets
>
> * changing -Y to -2 -R and/or adding -P doesn't seem to help
>
> (though again someone with more tshark experience might know the magic
> combination of arguments to get this to do what it's told).
>
>So instead, you can capture packets somewhere, e.g.:
>
> tcpdump -n -s 0 -w /var/tmp/kafka.tcpd -i eth1 'port 9092'
>
> and then decode them later:
>
> tshark -V -r /var/tmp/kafka.tcpd -o 'kafka.tcp.port:9092' -d
> tcp.port=9092,kafka -R 'kafka.topic_name==mytopic && kafka.request_key==0'
> -2
>
>Anyway, if you're seeing protocol-related weirdness, hopefully this
> will be at least of some help to you.
>
> -Steve
> (Yes, the email address is a joke.  Just not on you!  It does
> work.)
>


Re: Correct way to handle ConsumerTimeoutException

2014-08-13 Thread Neha Narkhede
I am using consumer.timeout.ms to force a consumer jump out hasNext call,
which will throw ConsumerTimeoutException.

Yes, this is the downside of the blocking iterator approach. If you want to
pull data in batches and process messages, the iterator is not the best API
as it can block at any time longer than your app is comfortable with. To
fix this and a bunch of other problems with the consumer API, we are
working on a new consumer client, that will replace the existing one and
will support completely new APIs.

Take a look at the new APIs here
.
Let us know if you have feedback.


On Tue, Aug 12, 2014 at 4:37 PM, Guozhang Wang  wrote:

> Hi Chen,
>
> The rational of using the consumer timeout exception is to indicate when
> there is no more data to be consumed, and hence upon capturing the
> exception the consumer should be closed.
>
> If you want to restart the consumer in handling the timeout exception, then
> you should probably just increasing the timeout value in the configs to
> avoid it throwing timeout exception.
>
> Guozhang
>
>
> On Tue, Aug 12, 2014 at 2:27 PM, Chen Wang 
> wrote:
>
> > Folks,
> > I am using consumer.timeout.ms to force a consumer jump out hasNext
> call,
> > which will throw ConsumerTimeoutException. It seems that upon receiving
> > this exception, the consumer is no longer usable and I need to call
> > .shutdown, and recreate:
> >
> > try{
> > } catch (ConsumerTimeoutException ex) {
> >
> >  logger.info("consumer timeout, we consider the topic is drained");
> >
> >  this.consumer.shutdown();
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >this.consumerProperties));
> >
> >  }
> >
> >
> > Is this the expected behavior? I call
> >
> > this.consumer = kafka.consumer.Consumer
> >
> >   .createJavaConsumerConnector(new ConsumerConfig(
> >
> >this.consumerProperties));
> >
> > in the thread initialization phase, and hope to reuse it upon
> > ConsumerTimeoutException
> >
> > Thanks,
> >
> > Chen
> >
>
>
>
> --
> -- Guozhang
>


Re: Most common kafka client comsumer implementations?

2014-08-13 Thread Neha Narkhede
option1 would take a throughput hit as you are trying to commit one message
at a time. Option 2 is pretty widely used at LinkedIn and am pretty sure at
several other places as well. Option 3 is essentially what the high level
consumer does under the covers already. It prefetches data in batches from
the server to provide high throughput.


On Wed, Aug 13, 2014 at 2:20 AM, Anand Nalya  wrote:

> Hi Jim,
>
> In one of the applications, we implemented option #1:
>
> messageList = getNext(1000)
> process(messageList)
> commit()
>
> In case of failure, this resulted in duplicate processing for at most 1000
> records per partition.
>
> Regards,
> Anand
>
>
> On 1 August 2014 20:35, Jim  wrote:
>
> > Thanks Guozhang,
> >
> > I was looking for actual real world workflows. I realize you can commit
> > after each message but if you’re using ZK for offsets for instance you’ll
> > put too much write load on the nodes and crush your throughput. So I was
> > interested in batching strategies people have used that balance high/full
> > throughput and fully committed events.
> >
> >
> > On Thu, Jul 31, 2014 at 8:16 AM, Guozhang Wang 
> wrote:
> >
> > > Hi Jim,
> > >
> > > Whether to use high level or simple consumer depends on your use case.
> If
> > > you need to manually manage partition assignments among your consumers,
> > or
> > > you need to commit your offsets elsewhere than ZK, or you do not want
> > auto
> > > rebalancing of consumers upon failures etc, you will use simple
> > consumers;
> > > otherwise you use high level consumer.
> > >
> > > From your description of pulling a batch of messages it seems you are
> > > currently using the simple consumer. Suppose you are using the high
> level
> > > consumer, to achieve at-lease-once basically you can do sth like:
> > >
> > > message = consumer.iter.next()
> > > process(message)
> > > consumer.commit()
> > >
> > > which is effectively the same as option 2 for using a simple consumer.
> Of
> > > course, doing so has a heavy overhead of one-commit-per-message, you
> can
> > > also do option 1, by the cost of duplicates, which is tolerable for
> > > at-least-once.
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jul 30, 2014 at 8:25 PM, Jim  wrote:
> > >
> > > > Curious on a couple questions...
> > > >
> > > > Are most people(are you?) using the simple consumer vs the high level
> > > > consumer in production?
> > > >
> > > >
> > > > What is the common processing paradigm for maintaining a full
> pipeline
> > > for
> > > > kafka consumers for at-least-once messaging? E.g. you pull a batch of
> > > 1000
> > > > messages and:
> > > >
> > > > option 1.
> > > > you wait for the slowest worker to finish working on that message,
> when
> > > you
> > > > get back 1000 acks internally you commit your offset and pull another
> > > batch
> > > >
> > > > option 2.
> > > > you feed your workers n msgs at a time in sequence and move your
> offset
> > > up
> > > > as you work through your batch
> > > >
> > > > option 3.
> > > > you maintain a full stream of 1000 messages ideally and as you get
> acks
> > > > back from your workers you see if you can move your offset up in the
> > > stream
> > > > to pull n more messages to fill up your pipeline so you're not
> blocked
> > by
> > > > the slowest consumer (probability wise)
> > > >
> > > >
> > > > any good docs or articles on the subject would be great, thanks!
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: Consuming messages from Kafka and pushing on to a JMS queue

2014-08-13 Thread Neha Narkhede
The power of the consumption APIs and the general performance offered by
Kafka is only useful if you can send the data to Kafka and use the consumer
APIs. Apache Storm will not solve the problem if you are trying to avoid
using the Kafka consumer APIs. I would rethink the architecture you
currently have and see if the JMS queues can be replaced by Kafka
consumers. This would require a plugin that could pull data out of the
source JMS queue, if there isn't a way to replace that as well.


On Wed, Aug 13, 2014 at 6:06 AM, Andrew Longwill <
andrew.longw...@openbet.com> wrote:

> Hi,
>
> We have an application that currently uses Camel to forward a JMS message
> from HornetQ to multiple consumers (JMS queues).
>
> The one challenge we have is handling failure of one of the consumers.
> Kafka seems to provide a solution here by allowing different consumers to
> keep track of their own offset.
>
> However we'd prefer to ultimately "push" the messages to each end point and
> not use Kafka's "pull" model as it requires implementing the Kafka's
> Consumer API. One approach could be to write  intermediary consumers which
> forward each message onto the appropriate JMS queue. Is this a good
> approach or is there a better way to do this (e.g. use Apache Storm)?
>
> Thanks
>
> Andrew
>


Re: Blocking Recursive parsing from kafka.consumer.TopicCount$.constructTopicCount

2014-08-13 Thread Jun Rao
Are you using Scala JSON in your consumer application?

Yes, we probably need to switch off Scala JSON since it's being deprecated.
Could you file a jira and put the link there?

Thanks,

Jun


On Tue, Aug 12, 2014 at 11:14 PM, Jagbir Hooda  wrote:

> > Date: Tue, 12 Aug 2014 16:35:35 -0700
> > Subject: Re: Blocking Recursive parsing from
> kafka.consumer.TopicCount$.constructTopicCount
> > From: wangg...@gmail.com
> > To: users@kafka.apache.org
> >
> > Hi Jagbir,
> >
> > The thread dump you uploaded is not readable, could you re-parse it and
> > upload again?
> >
> > Gouging
> Hi Guozhang,
> I'm sorry that that the email got garbled up. Below is another attempt.
> The first dump is for a recursive blocking thread holding the lock for
> 0xd3a7e1d0and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> -8<-"Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"prio=10
> tid=0x7f24dc285800 nid=0xda9 runnable
> [0x7f249e40b000]java.lang.Thread.State: RUNNABLEat
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)at
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:25

Re: Strange topic-corruption issue?

2014-08-13 Thread Jun Rao
Interesting, could you run DumpLogSegments with and w/o deep-iteration and
send the output around offset 1327?

Thanks,

Jun


On Tue, Aug 12, 2014 at 5:42 PM, Steve Miller 
wrote:

> [ "Aha!", you say, "now I know why this guy's been doing so much tshark
> stuff!" (-: ]
>
>Hi.  I'm running into a strange situation, in which more or less all of
> the topics on our Kafka server behave exactly as expected... but the data
> produced by one family of applications is producing fairly frequent topic
> corruption.
>
>When this happens, on the client side, the results are all over the
> place: sometimes you get a ConsumerFetchSizeTooSmall exception, or an
> exception for an unknown error type, or an invalid-offset error, it's all
> over the map.
>
>On the server side, I think something like this is the first sign of
> badness:
>
> [2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing
> ProducerRequest with correlation id 6750 from client test-producer on
> partition [mytopic,9] (kafka.server.KafkaApis)
> java.lang.ArrayIndexOutOfBoundsException
> [2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection
> response due to error handling produce request [clientId = test-producer,
> correlationId = 6750, topicAndPartition = [mytopic,9]] with Ack=0
> (kafka.server.KafkaApis)
>
> shortly thereafter, you begin to see oddness facing the clients:
>
> [2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch
> request for partition [mytopic,9] offset 1327 from consumer with
> correlation id 87204 (kafka.server.KafkaApis)
> java.lang.IllegalStateException: Invalid message size: 0
> at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
> at kafka.log.LogSegment.read(LogSegment.scala:137)
> at kafka.log.Log.read(Log.scala:386)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Unknown Source)
>
> If I go run the DumpLogSegments tool on the particular topic and partition
> that's generating the errors, I can see there's corruption in the log:
>
> Non-secutive offsets in
> :/data/d3/kafka/log/mytopic-9/.log
>   1327 is followed by 1327
>
> The only thing producing data to corrupted topics was also the only thing
> where snappy compression was turned on in the Java API being used by the
> producer (it's a Storm topology; we've had the same issue with one in Scala
> and with one that produces very similar data, but that was written in
> Java).  We turned that off, published to a different topic name (so it was
> created fresh), and had a couple of happy days where all was well.  Then we
> decided that all was well so we tried to go back to the original topic --
> after we'd verified that all data had aged out of the logs for that topic.
>  And we started seeing errors again.  So we switched to a different topic
> again, let it be created, and also started seeing errors on that topic.
>
> We have other producers, written in C and Java and python, which are
> working flawlessly, even though the size of the data they produce and the
> rate at which they produce it is much larger than what we're seeing with
> this one problematic producer.  We also have producers written in other
> languages that produce at very low rates, so it's (probably) not the sort
> of thing where the issue is masked by more frequent data production.
>
> But in any case it looks like there's something the client can send that
> will corrupt the topic, which seems like something that shouldn't be able
> to happen.  I know there's at least some error checking for bad protocol
> requests, as I hacked a python client to produce some corrupt messages and
> saw an error response from the server.
>
> I'm happy to supply more data but I'm not sure what would

Consuming messages from Kafka and pushing on to a JMS queue

2014-08-13 Thread Andrew Longwill
Hi,

We have an application that currently uses Camel to forward a JMS message
from HornetQ to multiple consumers (JMS queues).

The one challenge we have is handling failure of one of the consumers.
Kafka seems to provide a solution here by allowing different consumers to
keep track of their own offset.

However we'd prefer to ultimately "push" the messages to each end point and
not use Kafka's "pull" model as it requires implementing the Kafka's
Consumer API. One approach could be to write  intermediary consumers which
forward each message onto the appropriate JMS queue. Is this a good
approach or is there a better way to do this (e.g. use Apache Storm)?

Thanks

Andrew


Re: Most common kafka client comsumer implementations?

2014-08-13 Thread Anand Nalya
Hi Jim,

In one of the applications, we implemented option #1:

messageList = getNext(1000)
process(messageList)
commit()

In case of failure, this resulted in duplicate processing for at most 1000
records per partition.

Regards,
Anand


On 1 August 2014 20:35, Jim  wrote:

> Thanks Guozhang,
>
> I was looking for actual real world workflows. I realize you can commit
> after each message but if you’re using ZK for offsets for instance you’ll
> put too much write load on the nodes and crush your throughput. So I was
> interested in batching strategies people have used that balance high/full
> throughput and fully committed events.
>
>
> On Thu, Jul 31, 2014 at 8:16 AM, Guozhang Wang  wrote:
>
> > Hi Jim,
> >
> > Whether to use high level or simple consumer depends on your use case. If
> > you need to manually manage partition assignments among your consumers,
> or
> > you need to commit your offsets elsewhere than ZK, or you do not want
> auto
> > rebalancing of consumers upon failures etc, you will use simple
> consumers;
> > otherwise you use high level consumer.
> >
> > From your description of pulling a batch of messages it seems you are
> > currently using the simple consumer. Suppose you are using the high level
> > consumer, to achieve at-lease-once basically you can do sth like:
> >
> > message = consumer.iter.next()
> > process(message)
> > consumer.commit()
> >
> > which is effectively the same as option 2 for using a simple consumer. Of
> > course, doing so has a heavy overhead of one-commit-per-message, you can
> > also do option 1, by the cost of duplicates, which is tolerable for
> > at-least-once.
> >
> > Guozhang
> >
> >
> > On Wed, Jul 30, 2014 at 8:25 PM, Jim  wrote:
> >
> > > Curious on a couple questions...
> > >
> > > Are most people(are you?) using the simple consumer vs the high level
> > > consumer in production?
> > >
> > >
> > > What is the common processing paradigm for maintaining a full pipeline
> > for
> > > kafka consumers for at-least-once messaging? E.g. you pull a batch of
> > 1000
> > > messages and:
> > >
> > > option 1.
> > > you wait for the slowest worker to finish working on that message, when
> > you
> > > get back 1000 acks internally you commit your offset and pull another
> > batch
> > >
> > > option 2.
> > > you feed your workers n msgs at a time in sequence and move your offset
> > up
> > > as you work through your batch
> > >
> > > option 3.
> > > you maintain a full stream of 1000 messages ideally and as you get acks
> > > back from your workers you see if you can move your offset up in the
> > stream
> > > to pull n more messages to fill up your pipeline so you're not blocked
> by
> > > the slowest consumer (probability wise)
> > >
> > >
> > > any good docs or articles on the subject would be great, thanks!
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>