Re: Does adding ConsumerTimeoutException make the code more robust?

2014-11-29 Thread Rahul Amaram

Yes, I have configured consumer timeout config.

Let me put my query other way. Is it possible that it.hasNext() could be 
blocked even when there are messages available, In which case using 
Consumer Timeout could help?


Thanks,
Rahul.

On Saturday 29 November 2014 09:56 PM, Jun Rao wrote:

By default, it.hasNext() blocks when there is no more message to consume.
So catching ConsumerTimeoutException doesn't make any difference. You only
need to handle ConsumerTimeoutException if you have customized the consumer
timeout config.

Thanks,

Jun

On Thu, Nov 27, 2014 at 7:48 AM, Rahul Amaram 
wrote:


Hi,

I am just wondering if the below snippet

ConsumerIterator



Re: how many brokers to set in kafka

2014-11-29 Thread Sa Li
thanks a lot
> On Nov 29, 2014, at 8:29 AM, Jun Rao  wrote:
> 
> Typically, you will just have one broker per server. If you do want to set
> up multiple brokers on the same server, ideally you need to give each
> broker dedicated storage.
> 
> Thanks,
> 
> Jun
> 
> On Thu, Nov 27, 2014 at 11:09 AM, Sa Li  wrote:
> 
>> Hi, all
>> 
>> We are having 3 production server to setup for kafka cluster, I wonder how
>> many brokers to configure for each server.
>> 
>> 
>> thanks
>> 
>> 
>> --
>> 
>> Alec Li
>> 



Re: kafka 0.8.1 socket leak? (file descriptor outage)

2014-11-29 Thread Jun Rao
These logs don't indicate new socket connections. You need to look for
debug logs like "Accepted connection from". Are those connections from a
producer or a consumer? Is it possible that a client didn't call close()
properly?

Thanks,

Jun

On Thu, Nov 27, 2014 at 7:17 PM, Daniel Liu  wrote:

> Hi,
>
> We are using kafka 0.8.1 and it's working well for a long time. But there
> is an issue comes up two days ago:
>
> We monitored that kafka is not accepting new messages and get following
> exception. It's complaining that too many open files.
>
> We did a count on the socket used by the kafka process via "ls -ltr
> /proc/6770/fd | grep "socket" | wc -l" and noticed that the fds reached the
> limit (4096).
>
> Then we tried to recover the kafka by restart the kafka-server, we
> continue monitoring the sockets used by kafka and found that there will be
> 7 new sockets created every 30 seconds.
>
> Below trace confirmed that the socket increase happens every 30 seconds:
>
>
> [2014-11-27 21:48:05,268] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45648 (kafka.network.Processor)
> [2014-11-27 21:48:05,269] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45647 (kafka.network.Processor)
> [2014-11-27 21:48:05,271] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45650 (kafka.network.Processor)
> [2014-11-27 21:48:05,272] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45649 (kafka.network.Processor)
> [2014-11-27 21:48:05,279] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45652 (kafka.network.Processor)
> [2014-11-27 21:48:05,281] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45651 (kafka.network.Processor)
> [2014-11-27 21:48:05,281] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45653 (kafka.network.Processor)
> [2014-11-27 21:48:35,305] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45654 (kafka.network.Processor)
> [2014-11-27 21:48:35,305] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45656 (kafka.network.Processor)
> [2014-11-27 21:48:35,307] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45655 (kafka.network.Processor)
> [2014-11-27 21:48:35,310] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45657 (kafka.network.Processor)
> [2014-11-27 21:48:35,311] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45659 (kafka.network.Processor)
> [2014-11-27 21:48:35,314] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45660 (kafka.network.Processor)
> [2014-11-27 21:48:35,314] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45658 (kafka.network.Processor)
> [2014-11-27 21:49:05,309] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45662 (kafka.network.Processor)
> [2014-11-27 21:49:05,310] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45661 (kafka.network.Processor)
> [2014-11-27 21:49:05,321] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45663 (kafka.network.Processor)
> [2014-11-27 21:49:05,323] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45664 (kafka.network.Processor)
> [2014-11-27 21:49:05,324] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45665 (kafka.network.Processor)
> [2014-11-27 21:49:05,326] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45666 (kafka.network.Processor)
> [2014-11-27 21:49:05,326] TRACE Finished writing, registering for read on
> connection /10.158.141.31:45667 (kafka.network.Processor)
>
> Does this a known issue?
>
> Attached the full log.
>
> --
> Regards,
> Daniel Liu
>


Re: kafka web console running error

2014-11-29 Thread Jun Rao
You will need to contact the owner of that project directly.

Thanks,

Jun

On Thu, Nov 27, 2014 at 11:40 AM, Sa Li  wrote:

> I am using https://github.com/claudemamo/kafka-web-console version, and do
> you mind to tell where about to make such modification?
>
> thanks
>
> Alec
>
> On Mon, Nov 24, 2014 at 11:16 PM, Yang Fang 
> wrote:
>
> > do you see error msg "Too many open files"? it tips you should modify
> > nofile
> >
> > On Tue, Nov 25, 2014 at 1:26 PM, Jun Rao  wrote:
> >
> > > Which web console are you using?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Nov 21, 2014 at 8:34 AM, Sa Li  wrote:
> > >
> > > > Hi, all
> > > >
> > > > I am trying to get kafka web console work, but seems it only works
> few
> > > > hours and fails afterwards, below is the error messages on the
> screen.
> > I
> > > am
> > > > assuming something wrong with the DB, I used to swap H2 to mysql, but
> > > > didn't help. Anyone has similar problem?
> > > >
> > > >
> > > > -
> > > > .
> > > > .
> > > >
> > > >
> > > >at sun.misc.Resource.getByteBuffer(Resource.java:160)
> ~[na:1.7.0_65]
> > > > at
> java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
> > > > ~[na:1.7.0_65]
> > > >
> > > > at
> > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > at
> > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > at
> > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > at
> > > >
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > at
> > > > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > at
> > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:42)
> > > > at
> > > >
> > > >
> > >
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> > > > at
> > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > at
> > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > > > at
> > > >
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > at
> > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > [ERROR] Failed to construct terminal; falling back to unsupported
> > > > java.io.IOException: Cannot run program "sh": error=24, Too many open
> > > files
> > > > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
> > > > at java.lang.Runtime.exec(Runtime.java:617)
> > > > at java.lang.Runtime.exec(Runtime.java:485)
> > > > at
> > > >
> jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:183)
> > > > at
> > > >
> jline.internal.TerminalLineSettings.exec(TerminalLineSettings.java:173)
> > > > at
> > > >
> jline.internal.TerminalLineSettings.stty(TerminalLineSettings.java:168)
> > > > at
> > > > jline.internal.TerminalLineSettings.get(TerminalLineSettings.java:72)
> > > > at
> > > >
> > jline.internal.TerminalLineSettings.(TerminalLineSettings.java:52)
> > > > at jline.UnixTerminal.(UnixTerminal.java:31)
> > > > at
> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> > > > Method)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> > > > at
> > > java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > > > at java.lang.Class.newInstance(Class.java:379)
> > > > [error] a.a.ActorSystemImpl - Uncaught error from thread
> > > > [play-akka.actor.default-dispatcher-944] shutting down JVM since
> > > > 'akka.jvm-exit-on-fatal-error'
> > > > java.lang.NoClassDefFoundError:
> > > >
> > > >
> > >
> >
> common/Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1$$anonfun$applyOrElse$1
> > > > at
> > > >
> > > >
> > >
> >
> common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:75)
> > > > ~[na:na]
> > > > at
> > > >
> > > >
> > >
> >
> common.Util$$anonfun$getPartitionsLogSize$3$$anonfun$apply$19$$anonfun$apply$1.applyOrElse(Util.scala:74)
> > > > ~[na:na]
> > > > at
> > > >
> > > >
> > >
> >
> scala.runtime.AbstractPartialFunction$mcJL$sp.apply$mcJL$sp(AbstractPartialFunction.scala:33)
> > > > ~[scala-library.jar:na]
> > > > at
> > > >
> > > 

Re: how many brokers to set in kafka

2014-11-29 Thread Jun Rao
Typically, you will just have one broker per server. If you do want to set
up multiple brokers on the same server, ideally you need to give each
broker dedicated storage.

Thanks,

Jun

On Thu, Nov 27, 2014 at 11:09 AM, Sa Li  wrote:

> Hi, all
>
> We are having 3 production server to setup for kafka cluster, I wonder how
> many brokers to configure for each server.
>
>
> thanks
>
>
> --
>
> Alec Li
>


Re: Does adding ConsumerTimeoutException make the code more robust?

2014-11-29 Thread Jun Rao
By default, it.hasNext() blocks when there is no more message to consume.
So catching ConsumerTimeoutException doesn't make any difference. You only
need to handle ConsumerTimeoutException if you have customized the consumer
timeout config.

Thanks,

Jun

On Thu, Nov 27, 2014 at 7:48 AM, Rahul Amaram 
wrote:

>
> Hi,
>
> I am just wondering if the below snippet
>
> ConsumerIterator
> while (True)
> try {
> while (it.hasNext()) {
> ...
> ...
> ...
> } catch (ConsumerTimeoutException e) {
> // do nothing
> }
> }
>
> would be more robust than
>
> while(it.hasNext()) {
> ...
> ...
> ...
> }
>
> i.e. by setting a consumer timeout, catching it and again just waiting for
> the next message make it more robust?
>
> Regards,
> Rahul.
>


Re: Kafka 0.8.2 log cleaner

2014-11-29 Thread Jun Rao
Yes, log cleaner is in 0.8.2. You just need to set the retention policy of
a topic to "compact".

Thanks,

Jun

On Thu, Nov 27, 2014 at 5:20 AM, Khandygo, Evgeny (EXT) <
evgeny.khandygo@siemens.com> wrote:

> I’m wondering if you could tell me whether log cleaner implemented in
> 0.8.2 because it seems like it didn’t.
>
> Thanks
> John
>
>


Re: isr never update

2014-11-29 Thread Jun Rao
Could you check the state-change log of the follower replica and see if it
received the corresponding LeaderAndIsr request? If so, could you check the
max lag jmx (http://kafka.apache.org/documentation.html) in the follower
replica to see what the lag is?

Thanks,

Jun

On Thu, Nov 27, 2014 at 4:03 AM, Shangan Chen 
wrote:

> my kafka version is kafka_2.10-0.8.1.1.jar
>
> *state-change log:*
>
> [2014-11-25 02:30:19,290] TRACE Controller 29 epoch 7 sending
> UpdateMetadata request
> (Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4) with correlationId
> 1803 to broker 20 for partition [org.nginx,32] (state.change.logger)
>
> *controller log:*
>
> [2014-11-22 09:17:02,327]  [org.nginx,32] ->
> (Leader:29,ISR:29,24,LeaderEpoch:10,ControllerEpoch:4)
>
> *partition state in zookeeper:*
>
> [zk: localhost:2181(CONNECTED) 4] get
> /kafka08/brokers/topics/org.nginx/partitions/32/state
> {"controller_epoch":6,"leader":29,"version":1,"leader_epoch":11,"isr":[29]}
> cZxid = 0x5641824ee
> ctime = Fri Oct 10 12:53:47 CST 2014
> mZxid = 0x5a4c870b8
> mtime = Sat Nov 22 06:20:27 CST 2014
> pZxid = 0x5641824ee
> cversion = 0
> dataVersion = 19
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 75
> numChildren = 0
>
>
> Based on the above information, controller and state change log has the
> right information, but partition state in zookeeper was not updated and
> never try to update.
>
>
>
>
> On Tue, Nov 25, 2014 at 1:28 PM, Jun Rao  wrote:
>
> > Which version of Kafka are you using? Any error in the controller and the
> > state-change log?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Nov 21, 2014 at 5:59 PM, Shangan Chen 
> > wrote:
> >
> > > In the initial state all replicas are in isr list, but sometimes when I
> > > check the topic state, the replica can never become isr even if
> actually
> > it
> > > is synchronized. I saw in  the log, the leader print expand isr
> > request,but
> > > did not work. I found a interesting thing, the shrink and expand
> request
> > > happened just after the controller switch. I don't know whether it is
> > > related, and the controller log is overwrite, so I can not verify. Is
> > there
> > > anything I can do to trigger the isr update? Currently, I alter the
> > > zookeeper partition state, and it works, but it really need a lot of
> > manual
> > > work to do as I have quite a lot of topics in my cluster. Some useful
> > > information is as follows.
> > >
> > > *my replica lag config for default:*
> > >
> > > replica.lag.time.max.ms=1
> > > replica.lag.max.messages=4000
> > >
> > > *controller info:*
> > >
> > > [zk: localhost:2181(CONNECTED) 4] get /kafka08/controller
> > > {"version":1,"brokerid":29,"timestamp":"1416608404008"}
> > > cZxid = 0x5a4c85923
> > > ctime = Sat Nov 22 06:20:04 CST 2014
> > > mZxid = 0x5a4c85923
> > > mtime = Sat Nov 22 06:20:04 CST 2014
> > > pZxid = 0x5a4c85923
> > > cversion = 0
> > > dataVersion = 0
> > > aclVersion = 0
> > > ephemeralOwner = 0x5477ba622cb6c7d
> > > dataLength = 55
> > > numChildren = 0
> > >
> > >
> > > *topic info:*
> > >
> > > Topic:org.nginx PartitionCount:48   ReplicationFactor:2
>  Configs:
> > > Topic: org.nginxPartition: 0Leader: 17
> Replicas:
> > > 17,32 Isr: 17,32
> > > Topic: org.nginxPartition: 1Leader: 18
> Replicas:
> > > 18,33 Isr: 18,33
> > > Topic: org.nginxPartition: 2Leader: 19
> Replicas:
> > > 19,34 Isr: 34,19
> > > Topic: org.nginxPartition: 3Leader: 20
> Replicas:
> > > 20,35 Isr: 35,20
> > > Topic: org.nginxPartition: 4Leader: 21
> Replicas:
> > > 21,36 Isr: 21,36
> > > Topic: org.nginxPartition: 5Leader: 22
> Replicas:
> > > 22,17 Isr: 17,22
> > > Topic: org.nginxPartition: 6Leader: 23
> Replicas:
> > > 23,18 Isr: 18,23
> > > Topic: org.nginxPartition: 7Leader: 24
> Replicas:
> > > 24,19 Isr: 24,19
> > > Topic: org.nginxPartition: 8Leader: 25
> Replicas:
> > > 25,20 Isr: 25,20
> > > Topic: org.nginxPartition: 9Leader: 26
> Replicas:
> > > 26,21 Isr: 26,21
> > > Topic: org.nginxPartition: 10   Leader: 27
> Replicas:
> > > 27,22 Isr: 27,22
> > > Topic: org.nginxPartition: 11   Leader: 28
> Replicas:
> > > 28,23 Isr: 28,23
> > > Topic: org.nginxPartition: 12   Leader: 29
> Replicas:
> > > 29,24 Isr: 29
> > > Topic: org.nginxPartition: 13   Leader: 30
> Replicas:
> > > 30,25 Isr: 30,25
> > > Topic: org.nginxPartition: 14   Leader: 31
> Replicas:
> > > 31,26 Isr: 26,31
> > > Topic: org.nginxPartition: 15   Leader: 32
> Replicas:
> > > 32,27 Isr: 27,32
> > > Topic: org.nginxPartition: 16   Leader: 33
> Replicas:
> > > 33,28 Isr: 33,28
> > > Topic: org.nginxPartition: 17   Leader: 34
> Replicas:
> > > 34,29 Isr: 29,34
> > > Topic: org.nginxPartition: 18   Leader