Add partitions with replica assignment in same command

2014-11-07 Thread Allen Wang
I am trying to figure out how to add partitions and assign replicas using
one admin command. I tried kafka.admin.TopicCommand to increase the
partition number from 9 to 12 with the following options:

/apps/kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand  --zookeeper
${ZOOKEEPER} --alter --topic test_topic_4 --partitions 12
--replica-assignment 2:1,0:2,1:0,1:2,2:0,0:1,1:0,2:1,0:2,2:1,0:2,1:0

This gives me an error

Option "[replica-assignment]" can't be used with option"[partitions]"

Looking into the TopicCommand, alterTopic function seems to be able to
handle that but the command exits with the above error before this function
is invoked.

Is there any workaround or other recommended way to achieve this?

Thanks,
Allen


Re: Announcing Confluent

2014-11-07 Thread vipul jhawar
Best of luck. Will stay tuned to news.

On Thu, Nov 6, 2014 at 11:58 PM, Jay Kreps  wrote:

> Hey all,
>
> I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> company around Kafka called Confluent. We are planning on productizing the
> kind of Kafka-based real-time data platform we built out at LinkedIn. We
> are doing this because we think this is a really powerful idea and we felt
> there was a lot to do to make this idea really take root. We wanted to make
> that our full time mission and focus.
>
> There is a blog post that goes into a little more depth here:
> http://blog.confluent.io/
>
> LinkedIn will remain a heavy Kafka user and contributor. Combined with our
> additional resources from the funding of the company this should be a
> really good thing for the Kafka development effort. Especially when
> combined with the increasing contributions from the rest of the development
> community. This is great news, as there is a lot of work to do. We'll need
> to really focus on scaling this distributed development in a healthy way.
>
> One thing I do want to emphasize is that the addition of a company in the
> Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> 100% open source and community focused, as of course is true of any Apache
> project. I have been doing open source for a long time and strongly believe
> it is the right model for infrastructure software development.
>
> Confluent is just getting off the ground now. We left LinkedIn, raised some
> money, and we have an office (but no furniture yet!). None the less, f you
> are interested in finding out more about the company and either getting
> help with your Kafka usage or joining us to help build all this, by all
> means reach out to us, we’d love to talk.
>
> Wish us luck!
>
> -Jay
>


Consumer thread dies

2014-11-07 Thread Srinivas Reddy Kancharla
Hi,

I have a scenario where I have 1 partition and 1 consumer group having 2
consumer threads running say C1 and C2. Since there is only one partition
for a given topic, say C1 is holding that partition. Now due to some reason
if C1 dies, can C2 get hold of that partition?

i.e. C1 was busy with KafkaStream instance, for any reason if C1 dies or in
hung state, Can we make C2 talking to KafkaStream (for Partition 0).
I am facing this issue where I have 10 messages in partition 0 and C1 was
consuming it. At message 4, C1 went into hung state. Now I would like to
make C2 to consumer other messages which are not consumed by C1.

Thank and regards,
Srini


Re: Cannot connect to Kafka from outside of EC2

2014-11-07 Thread Sameer Yami
The version is kafka_2.10-0.8.1.1. It is not the latest trunk.
Will try enabling debug version.

thanks


On Thu, Nov 6, 2014 at 9:37 PM, Guozhang Wang  wrote:

> Sameer,
>
> The server logs do not contain any non-INFO logs, which is a bit wired. Did
> you deploy the current trunk of Kafka? Also could you enable DEBUG level
> logging on Kafka brokers?
>
> Guozhang
>
> On Wed, Nov 5, 2014 at 3:50 PM, Sameer Yami  wrote:
>
> > The server.log was taken separately.
> > We ran the test again and the server and producer logs are below (to get
> > same timings).
> >
> >
> > Thanks!
> >
> >
> >
> 
> >
> >
> >
> > Producer Logs -
> >
> >
> > 2014-11-05 23:38:58,693
> > Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> > DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for
> sessionid:
> > 0x1498251e8680002 after 0ms
> > 2014-11-05 23:39:00,695
> > Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> > DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for
> sessionid:
> > 0x1498251e8680002 after 0ms
> > 2014-11-05 23:39:02,696
> > Thread-3-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> > DEBUG org.apache.zookeeper.ClientCnxn-759: Got ping response for
> sessionid:
> > 0x1498251e8680002 after 0ms
> > 2014-11-05 23:39:02,828 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Verifying properties
> > 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Property auto.commit.interval.ms is
> > overridden to 1000
> > 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Property auto.offset.reset is
> > overridden to smallest
> > 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Property consumer.timeout.ms is
> > overridden to 10
> > 2014-11-05 23:39:02,829 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Property group.id is overridden to
> > TestCheck
> > 2014-11-05 23:39:02,830 pool-13-thread-2   WARN
> > kafka.utils.VerifiableProperties-83: Property serializer.class is not
> valid
> > 2014-11-05 23:39:02,830 pool-13-thread-2   INFO
> > kafka.utils.VerifiableProperties-68: Property zookeeper.connect is
> > overridden to 172.31.25.198:2181
> > 2014-11-05 23:39:02,831 pool-13-thread-2   INFO
> > kafka.consumer.ZookeeperConsumerConnector-68:
> > [TestCheck_ip-172-31-25-198-1415230742830-f3dfc362], Connecting to
> > zookeeper instance at 172.31.25.198:2181
> > 2014-11-05 23:39:02,831 pool-13-thread-2  DEBUG
> > org.I0Itec.zkclient.ZkConnection-63: Creating new ZookKeeper instance to
> > connect to 172.31.25.198:2181.
> > 2014-11-05 23:39:02,831 pool-13-thread-2   INFO
> > org.apache.zookeeper.ZooKeeper-379: Initiating client connection,
> > connectString=172.31.25.198:2181 sessionTimeout=6000
> > watcher=org.I0Itec.zkclient.ZkClient@3903b165
> > 2014-11-05 23:39:02,831 ZkClient-EventThread-29-172.31.25.198:2181   INFO
> > org.I0Itec.zkclient.ZkEventThread-64: Starting ZkClient event thread.
> > 2014-11-05 23:39:02,831 pool-13-thread-1   INFO
> > kafka.utils.VerifiableProperties-68: Verifying properties
> > 2014-11-05 23:39:02,836 pool-13-thread-2-SendThread()   INFO
> > org.apache.zookeeper.ClientCnxn-1061: Opening socket connection to
> server /
> > 172.31.25.198:2181
> > 2014-11-05 23:39:02,836 pool-13-thread-1   WARN
> > kafka.utils.VerifiableProperties-83: Property batch.size is not valid
> > 2014-11-05 23:39:02,832 pool-13-thread-2  DEBUG
> > org.I0Itec.zkclient.ZkClient-878: Awaiting connection to Zookeeper server
> > 2014-11-05 23:39:02,836 pool-13-thread-1   INFO
> > kafka.utils.VerifiableProperties-68: Property message.send.max.retries is
> > overridden to 10
> > 2014-11-05 23:39:02,836 pool-13-thread-2  DEBUG
> > org.I0Itec.zkclient.ZkClient-628: Waiting for keeper state SyncConnected
> > 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> > kafka.utils.VerifiableProperties-68: Property metadata.broker.list is
> > overridden to 172.31.25.198:9092
> > 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> > kafka.utils.VerifiableProperties-68: Property retry.backoff.ms is
> > overridden to 1000
> > 2014-11-05 23:39:02,837 pool-13-thread-1   INFO
> > kafka.utils.VerifiableProperties-68: Property serializer.class is
> > overridden to kafka.serializer.StringEncoder
> > 2014-11-05 23:39:02,837
> >
> >
> pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> > INFO org.apache.zookeeper.ClientCnxn-950: Socket connection established
> to
> > ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181,
> initiating
> > session
> > 2014-11-05 23:39:02,838
> >
> >
> pool-13-thread-2-SendThread(ip-172-31-25-198.us-west-1.compute.internal:2181)
> > DEBUG org.apache.zookeeper.ClientCnxn-999: Session establishment request
> > sent on ip-172-31-25-198.us-west-1.compute.internal/172.31.25.198:2181
> > 2014-11-

Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-07 Thread Solon Gordon
We're using 0.8.1.1 with auto.leader.rebalance.enable=true.

On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang  wrote:

> Solon,
>
> Which version of Kafka are you running and are you enabling auto leader
> rebalance at the same time?
>
> Guozhang
>
> On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon  wrote:
>
> > Hi all,
> >
> > My team has observed that if a broker process is killed in the middle of
> > the controlled shutdown procedure, the remaining brokers start spewing
> > errors and do not properly rebalance leadership. The cluster cannot
> recover
> > without major manual intervention.
> >
> > Here is how to reproduce the problem:
> > 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A,
> > B, and C.) Set controlled.shutdown.enable=true.
> > 2. Create a topic with replication_factor = 3 and a large number of
> > partitions (say 100).
> > 3. Send a TERM signal to broker A. This initiates controlled shutdown.
> > 4. Before controlled shutdown completes, quickly send a KILL signal to
> > broker A.
> >
> > Result:
> > - Brokers B and C start logging ReplicaFetcherThread connection errors
> > every few milliseconds. (See below for an example.)
> > - Broker A is still listed as a leader and ISR for any partitions which
> > were not transferred during controlled shutdown. This causes connection
> > errors when clients try to produce to or consume from these partitions.
> >
> > This scenario is difficult to recover from. The only ways we have found
> are
> > to restart broker A multiple times (if it still exists) or to kill both B
> > and C and then start them one by one. Without this kind of intervention,
> > the above issues persist indefinitely.
> >
> > This may sound like a contrived scenario, but it's exactly what we have
> > seen when a Kafka EC2 instance gets terminated by AWS. So this seems
> like a
> > real liability.
> >
> > Are there any existing JIRA tickets which cover this behavior? And do you
> > have any recommendations for avoiding it, other than forsaking controlled
> > shutdowns entirely?
> >
> > Thanks,
> > Solon
> >
> > Error example:
> > [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
> Error
> > in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
> > ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500
> ms;
> > MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> > PartitionFetchInfo(503,10485760),[my-topic,63] ->
> > PartitionFetchInfo(386,10485760),[my-topic,99] ->
> > PartitionFetchInfo(525,10485760),[my-topic,84] ->
> > PartitionFetchInfo(436,10485760),[my-topic,48] ->
> > PartitionFetchInfo(484,10485760),[my-topic,75] ->
> > PartitionFetchInfo(506,10485760),[my-topic,45] ->
> > PartitionFetchInfo(473,10485760),[my-topic,66] ->
> > PartitionFetchInfo(532,10485760),[my-topic,30] ->
> > PartitionFetchInfo(435,10485760),[my-topic,96] ->
> > PartitionFetchInfo(517,10485760),[my-topic,27] ->
> > PartitionFetchInfo(470,10485760),[my-topic,36] ->
> > PartitionFetchInfo(472,10485760),[my-topic,9] ->
> > PartitionFetchInfo(514,10485760),[my-topic,33] ->
> > PartitionFetchInfo(582,10485760),[my-topic,69] ->
> > PartitionFetchInfo(504,10485760),[my-topic,57] ->
> > PartitionFetchInfo(444,10485760),[my-topic,78] ->
> > PartitionFetchInfo(559,10485760),[my-topic,12] ->
> > PartitionFetchInfo(417,10485760),[my-topic,90] ->
> > PartitionFetchInfo(429,10485760),[my-topic,18] ->
> > PartitionFetchInfo(497,10485760),[my-topic,0] ->
> > PartitionFetchInfo(402,10485760),[my-topic,6] ->
> > PartitionFetchInfo(527,10485760),[my-topic,54] ->
> > PartitionFetchInfo(524,10485760),[my-topic,15] ->
> > PartitionFetchInfo(448,10485760),[console,0] ->
> > PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> >
> >
> kafka.consumer.Sim

Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-07 Thread Guozhang Wang
Solon,

Which version of Kafka are you running and are you enabling auto leader
rebalance at the same time?

Guozhang

On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon  wrote:

> Hi all,
>
> My team has observed that if a broker process is killed in the middle of
> the controlled shutdown procedure, the remaining brokers start spewing
> errors and do not properly rebalance leadership. The cluster cannot recover
> without major manual intervention.
>
> Here is how to reproduce the problem:
> 1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A,
> B, and C.) Set controlled.shutdown.enable=true.
> 2. Create a topic with replication_factor = 3 and a large number of
> partitions (say 100).
> 3. Send a TERM signal to broker A. This initiates controlled shutdown.
> 4. Before controlled shutdown completes, quickly send a KILL signal to
> broker A.
>
> Result:
> - Brokers B and C start logging ReplicaFetcherThread connection errors
> every few milliseconds. (See below for an example.)
> - Broker A is still listed as a leader and ISR for any partitions which
> were not transferred during controlled shutdown. This causes connection
> errors when clients try to produce to or consume from these partitions.
>
> This scenario is difficult to recover from. The only ways we have found are
> to restart broker A multiple times (if it still exists) or to kill both B
> and C and then start them one by one. Without this kind of intervention,
> the above issues persist indefinitely.
>
> This may sound like a contrived scenario, but it's exactly what we have
> seen when a Kafka EC2 instance gets terminated by AWS. So this seems like a
> real liability.
>
> Are there any existing JIRA tickets which cover this behavior? And do you
> have any recommendations for avoiding it, other than forsaking controlled
> shutdowns entirely?
>
> Thanks,
> Solon
>
> Error example:
> [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], Error
> in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
> ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
> PartitionFetchInfo(503,10485760),[my-topic,63] ->
> PartitionFetchInfo(386,10485760),[my-topic,99] ->
> PartitionFetchInfo(525,10485760),[my-topic,84] ->
> PartitionFetchInfo(436,10485760),[my-topic,48] ->
> PartitionFetchInfo(484,10485760),[my-topic,75] ->
> PartitionFetchInfo(506,10485760),[my-topic,45] ->
> PartitionFetchInfo(473,10485760),[my-topic,66] ->
> PartitionFetchInfo(532,10485760),[my-topic,30] ->
> PartitionFetchInfo(435,10485760),[my-topic,96] ->
> PartitionFetchInfo(517,10485760),[my-topic,27] ->
> PartitionFetchInfo(470,10485760),[my-topic,36] ->
> PartitionFetchInfo(472,10485760),[my-topic,9] ->
> PartitionFetchInfo(514,10485760),[my-topic,33] ->
> PartitionFetchInfo(582,10485760),[my-topic,69] ->
> PartitionFetchInfo(504,10485760),[my-topic,57] ->
> PartitionFetchInfo(444,10485760),[my-topic,78] ->
> PartitionFetchInfo(559,10485760),[my-topic,12] ->
> PartitionFetchInfo(417,10485760),[my-topic,90] ->
> PartitionFetchInfo(429,10485760),[my-topic,18] ->
> PartitionFetchInfo(497,10485760),[my-topic,0] ->
> PartitionFetchInfo(402,10485760),[my-topic,6] ->
> PartitionFetchInfo(527,10485760),[my-topic,54] ->
> PartitionFetchInfo(524,10485760),[my-topic,15] ->
> PartitionFetchInfo(448,10485760),[console,0] ->
> PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.con

Re: OffsetOutOfRange errors

2014-11-07 Thread Jason Rosenberg
The bottom line, is you are likely not consuming messages fast enough, so
you are falling behind.  So, you are steadily consuming older and older
messages, and eventually you are consuming messages older than the
retention time window set for your kafka broker.  That's the typical
scenario for this error.

Also, if you are consuming messages 300/sec, but you are committing once
every 100 messages, that means you are committing 3 times a second, which
could be a bottleneck. The default auto-offset commit is to do it every 60
seconds, etc.

Jason

On Fri, Nov 7, 2014 at 1:30 PM, Jimmy John  wrote:

> The current setting is to commit to ZK every 100 messages read.
>
> The read buffer size is 262144 bytes. So we will read in a bunch of
> messages in a batch. And while iterating through those messages, we commit
> the offset to ZK every 100.
>
> jim
>
> On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang  wrote:
>
> > Hi Jim,
> >
> > When messages gets cleaned based on data retention policy (by time or by
> > size), the brokers will not inform ZK for the deletion event. The
> > underlying assumption is that when consumers are fetching data at around
> > the tail of the log (i.e. they are not much lagging, which is normal
> cases)
> > they should be continuously update the consumed offsets in ZK and hence
> > that offsets will be valid most of the time. When consumers are lagging
> > behind and the old messages are cleaned they will get this exception, and
> > consumers need to handle it by resetting their offset to, e.g. the head
> of
> > the log.
> >
> > How frequent do your clients read / write the offsets in ZK?
> >
> > Guozhang
> >
> > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John  wrote:
> >
> > > Hello,
> > >
> > >   I understand what this error means, just not sure why I keep running
> > into
> > > it after 24-48 hrs of running fine consuming > 300 messages / second.
> > >
> > >   What happens when a kafka log rolls over and some old records are
> aged
> > > out? I mean what happens to the offsets? We are using a python client
> > which
> > > stores the offsets in ZK. But in the middle of the run, say after 2
> days
> > or
> > > so, suddenly it gets this error.
> > >
> > > The only possibility is that the older records have aged off and ZK
> still
> > > has the offset which is no longer applicable...How does the java client
> > > deal with this? Does kafka inform ZK that records have been aged off
> and
> > > update the offset or something?
> > >
> > > Here is the error i see in the broker logs
> > >
> > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing
> fetch
> > > request for partition [activity.stream,3] offset 8013827 from consumer
> > > with
> > > correlation id 73 (kafka.server.KafkaApis)
> > >
> > >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> > we
> > > only have log segments in the range 8603331 to 11279773.
> > >
> > >  at kafka.log.Log.read(Log.scala:380)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >
> > >  at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >  at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >  at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> > >
> > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >
> > >  at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >
> > >  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >
> > >  at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >
> > >  at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >
> > >  at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > thx
> > >
> > > Jim
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: OffsetOutOfRange errors

2014-11-07 Thread Guozhang Wang
When would you read offsets from ZK, only when starting up? Also what is
your data retention config values on the broker?

Guozhang

On Fri, Nov 7, 2014 at 10:30 AM, Jimmy John  wrote:

> The current setting is to commit to ZK every 100 messages read.
>
> The read buffer size is 262144 bytes. So we will read in a bunch of
> messages in a batch. And while iterating through those messages, we commit
> the offset to ZK every 100.
>
> jim
>
> On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang  wrote:
>
> > Hi Jim,
> >
> > When messages gets cleaned based on data retention policy (by time or by
> > size), the brokers will not inform ZK for the deletion event. The
> > underlying assumption is that when consumers are fetching data at around
> > the tail of the log (i.e. they are not much lagging, which is normal
> cases)
> > they should be continuously update the consumed offsets in ZK and hence
> > that offsets will be valid most of the time. When consumers are lagging
> > behind and the old messages are cleaned they will get this exception, and
> > consumers need to handle it by resetting their offset to, e.g. the head
> of
> > the log.
> >
> > How frequent do your clients read / write the offsets in ZK?
> >
> > Guozhang
> >
> > On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John  wrote:
> >
> > > Hello,
> > >
> > >   I understand what this error means, just not sure why I keep running
> > into
> > > it after 24-48 hrs of running fine consuming > 300 messages / second.
> > >
> > >   What happens when a kafka log rolls over and some old records are
> aged
> > > out? I mean what happens to the offsets? We are using a python client
> > which
> > > stores the offsets in ZK. But in the middle of the run, say after 2
> days
> > or
> > > so, suddenly it gets this error.
> > >
> > > The only possibility is that the older records have aged off and ZK
> still
> > > has the offset which is no longer applicable...How does the java client
> > > deal with this? Does kafka inform ZK that records have been aged off
> and
> > > update the offset or something?
> > >
> > > Here is the error i see in the broker logs
> > >
> > > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing
> fetch
> > > request for partition [activity.stream,3] offset 8013827 from consumer
> > > with
> > > correlation id 73 (kafka.server.KafkaApis)
> > >
> > >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> > we
> > > only have log segments in the range 8603331 to 11279773.
> > >
> > >  at kafka.log.Log.read(Log.scala:380)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> > >
> > >  at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >  at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> > >
> > >  at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> > >
> > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> > >
> > >  at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> > >
> > >  at
> > >
> > >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> > >
> > >  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> > >
> > >  at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> > >
> > >  at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> > >
> > >  at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > thx
> > >
> > > Jim
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: OffsetOutOfRange errors

2014-11-07 Thread Jimmy John
The current setting is to commit to ZK every 100 messages read.

The read buffer size is 262144 bytes. So we will read in a bunch of
messages in a batch. And while iterating through those messages, we commit
the offset to ZK every 100.

jim

On Fri, Nov 7, 2014 at 10:13 AM, Guozhang Wang  wrote:

> Hi Jim,
>
> When messages gets cleaned based on data retention policy (by time or by
> size), the brokers will not inform ZK for the deletion event. The
> underlying assumption is that when consumers are fetching data at around
> the tail of the log (i.e. they are not much lagging, which is normal cases)
> they should be continuously update the consumed offsets in ZK and hence
> that offsets will be valid most of the time. When consumers are lagging
> behind and the old messages are cleaned they will get this exception, and
> consumers need to handle it by resetting their offset to, e.g. the head of
> the log.
>
> How frequent do your clients read / write the offsets in ZK?
>
> Guozhang
>
> On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John  wrote:
>
> > Hello,
> >
> >   I understand what this error means, just not sure why I keep running
> into
> > it after 24-48 hrs of running fine consuming > 300 messages / second.
> >
> >   What happens when a kafka log rolls over and some old records are aged
> > out? I mean what happens to the offsets? We are using a python client
> which
> > stores the offsets in ZK. But in the middle of the run, say after 2 days
> or
> > so, suddenly it gets this error.
> >
> > The only possibility is that the older records have aged off and ZK still
> > has the offset which is no longer applicable...How does the java client
> > deal with this? Does kafka inform ZK that records have been aged off and
> > update the offset or something?
> >
> > Here is the error i see in the broker logs
> >
> > [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch
> > request for partition [activity.stream,3] offset 8013827 from consumer
> > with
> > correlation id 73 (kafka.server.KafkaApis)
> >
> >  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but
> we
> > only have log segments in the range 8603331 to 11279773.
> >
> >  at kafka.log.Log.read(Log.scala:380)
> >
> >  at
> >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> >
> >  at
> >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> >
> >  at
> >
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> >
> >  at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> >  at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> >
> >  at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
> >
> > at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> >
> >  at scala.collection.immutable.Map$Map3.map(Map.scala:144)
> >
> >  at
> >
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> >
> >  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
> >
> >  at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
> >
> >  at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> >  at java.lang.Thread.run(Thread.java:745)
> >
> >
> > thx
> >
> > Jim
> >
>
>
>
> --
> -- Guozhang
>


Re: OffsetOutOfRange errors

2014-11-07 Thread Guozhang Wang
Hi Jim,

When messages gets cleaned based on data retention policy (by time or by
size), the brokers will not inform ZK for the deletion event. The
underlying assumption is that when consumers are fetching data at around
the tail of the log (i.e. they are not much lagging, which is normal cases)
they should be continuously update the consumed offsets in ZK and hence
that offsets will be valid most of the time. When consumers are lagging
behind and the old messages are cleaned they will get this exception, and
consumers need to handle it by resetting their offset to, e.g. the head of
the log.

How frequent do your clients read / write the offsets in ZK?

Guozhang

On Thu, Nov 6, 2014 at 6:23 PM, Jimmy John  wrote:

> Hello,
>
>   I understand what this error means, just not sure why I keep running into
> it after 24-48 hrs of running fine consuming > 300 messages / second.
>
>   What happens when a kafka log rolls over and some old records are aged
> out? I mean what happens to the offsets? We are using a python client which
> stores the offsets in ZK. But in the middle of the run, say after 2 days or
> so, suddenly it gets this error.
>
> The only possibility is that the older records have aged off and ZK still
> has the offset which is no longer applicable...How does the java client
> deal with this? Does kafka inform ZK that records have been aged off and
> update the offset or something?
>
> Here is the error i see in the broker logs
>
> [2014-11-07 01:40:32,478] ERROR [KafkaApi-11] Error when processing fetch
> request for partition [activity.stream,3] offset 8013827 from consumer
> with
> correlation id 73 (kafka.server.KafkaApis)
>
>  kafka.common.OffsetOutOfRangeException: Request for offset 8013827 but we
> only have log segments in the range 8603331 to 11279773.
>
>  at kafka.log.Log.read(Log.scala:380)
>
>  at
>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
>
>  at
>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
>
>  at
>
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
>
>  at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
>  at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
>
>  at scala.collection.immutable.Map$Map3.foreach(Map.scala:164)
>
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
>
>  at scala.collection.immutable.Map$Map3.map(Map.scala:144)
>
>  at
>
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
>
>  at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
>
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>
>  at java.lang.Thread.run(Thread.java:745)
>
>
> thx
>
> Jim
>



-- 
-- Guozhang


Re: corrupt recovery checkpoint file issue....

2014-11-07 Thread Guozhang Wang
Jun,

Checking the OffsetCheckpoint.write function, if
"fileOutputStream.getFD.sync" throws exception it will just be caught and
forgotten, and the swap will still happen, may be we need to catch the
SyncFailedException and re-throw it as a FATAIL error to skip the swap.

Guozhang


On Thu, Nov 6, 2014 at 8:50 PM, Jason Rosenberg  wrote:

> I'm still not sure what caused the reboot of the system (but yes it appears
> to have crashed hard).  The file system is xfs, on CentOs linux.  I'm not
> yet sure, but I think also before the crash, the system might have become
> wedged.
>
> It appears the corrupt recovery files actually contained all zero bytes,
> after looking at it with odb.
>
> I'll file a Jira.
>
> On Thu, Nov 6, 2014 at 7:09 PM, Jun Rao  wrote:
>
> > I am also wondering how the corruption happened. The way that we update
> the
> > OffsetCheckpoint file is to first write to a tmp file and flush the data.
> > We then rename the tmp file to the final file. This is done to prevent
> > corruption caused by a crash in the middle of the writes. In your case,
> was
> > the host crashed? What kind of storage system are you using? Is there any
> > non-volatile cache on the storage system?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Nov 6, 2014 at 6:31 AM, Jason Rosenberg 
> wrote:
> >
> > > Hi,
> > >
> > > We recently had a kafka node go down suddenly. When it came back up, it
> > > apparently had a corrupt recovery file, and refused to startup:
> > >
> > > 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error
> > > starting up KafkaServer
> > > java.lang.NumberFormatException: For input string:
> > >
> > >
> >
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> > >
> > >
> >
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> > > at
> > >
> >
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> > > at java.lang.Integer.parseInt(Integer.java:481)
> > > at java.lang.Integer.parseInt(Integer.java:527)
> > > at
> > > scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> > > at
> scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> > > at
> kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> > > at
> > > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> > > at
> > > kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> > > at
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > > at
> > > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > > at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> > > at kafka.log.LogManager.(LogManager.scala:57)
> > > at
> > kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> > > at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> > >
> > > And since the app is under a monitor (so it was repeatedly restarting
> and
> > > failing with this error for several minutes before we got to it)…
> > >
> > > We moved the ‘recovery-point-offset-checkpoint’ file out of the way,
> and
> > it
> > > then restarted cleanly (but of course re-synced all it’s data from
> > > replicas, so we had no data loss).
> > >
> > > Anyway, I’m wondering if that’s the expected behavior? Or should it not
> > > declare it corrupt and then proceed automatically to an unclean
> restart?
> > >
> > > Should this NumberFormatException be handled a bit more gracefully?
> > >
> > > We saved the corrupt file if it’s worth inspecting (although I doubt it
> > > will be useful!)….
> > >
> > > Jason
> > > ​
> > >
> >
>



-- 
-- Guozhang


Interrupting controlled shutdown breaks Kafka cluster

2014-11-07 Thread Solon Gordon
Hi all,

My team has observed that if a broker process is killed in the middle of
the controlled shutdown procedure, the remaining brokers start spewing
errors and do not properly rebalance leadership. The cluster cannot recover
without major manual intervention.

Here is how to reproduce the problem:
1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them A,
B, and C.) Set controlled.shutdown.enable=true.
2. Create a topic with replication_factor = 3 and a large number of
partitions (say 100).
3. Send a TERM signal to broker A. This initiates controlled shutdown.
4. Before controlled shutdown completes, quickly send a KILL signal to
broker A.

Result:
- Brokers B and C start logging ReplicaFetcherThread connection errors
every few milliseconds. (See below for an example.)
- Broker A is still listed as a leader and ISR for any partitions which
were not transferred during controlled shutdown. This causes connection
errors when clients try to produce to or consume from these partitions.

This scenario is difficult to recover from. The only ways we have found are
to restart broker A multiple times (if it still exists) or to kill both B
and C and then start them one by one. Without this kind of intervention,
the above issues persist indefinitely.

This may sound like a contrived scenario, but it's exactly what we have
seen when a Kafka EC2 instance gets terminated by AWS. So this seems like a
real liability.

Are there any existing JIRA tickets which cover this behavior? And do you
have any recommendations for avoiding it, other than forsaking controlled
shutdowns entirely?

Thanks,
Solon

Error example:
[2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225], Error
in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500 ms;
MinBytes: 1 bytes; RequestInfo: [my-topic,42] ->
PartitionFetchInfo(503,10485760),[my-topic,63] ->
PartitionFetchInfo(386,10485760),[my-topic,99] ->
PartitionFetchInfo(525,10485760),[my-topic,84] ->
PartitionFetchInfo(436,10485760),[my-topic,48] ->
PartitionFetchInfo(484,10485760),[my-topic,75] ->
PartitionFetchInfo(506,10485760),[my-topic,45] ->
PartitionFetchInfo(473,10485760),[my-topic,66] ->
PartitionFetchInfo(532,10485760),[my-topic,30] ->
PartitionFetchInfo(435,10485760),[my-topic,96] ->
PartitionFetchInfo(517,10485760),[my-topic,27] ->
PartitionFetchInfo(470,10485760),[my-topic,36] ->
PartitionFetchInfo(472,10485760),[my-topic,9] ->
PartitionFetchInfo(514,10485760),[my-topic,33] ->
PartitionFetchInfo(582,10485760),[my-topic,69] ->
PartitionFetchInfo(504,10485760),[my-topic,57] ->
PartitionFetchInfo(444,10485760),[my-topic,78] ->
PartitionFetchInfo(559,10485760),[my-topic,12] ->
PartitionFetchInfo(417,10485760),[my-topic,90] ->
PartitionFetchInfo(429,10485760),[my-topic,18] ->
PartitionFetchInfo(497,10485760),[my-topic,0] ->
PartitionFetchInfo(402,10485760),[my-topic,6] ->
PartitionFetchInfo(527,10485760),[my-topic,54] ->
PartitionFetchInfo(524,10485760),[my-topic,15] ->
PartitionFetchInfo(448,10485760),[console,0] ->
PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-11-06 17:10:21,462] WARN Reconnect due to socket error

Re: Disactivating Yammer Metrics Monitoring

2014-11-07 Thread François Langelier
Good to know!

Thanks Jason, I'll look at it ASAP! :)



François Langelier
Étudiant en génie Logiciel - École de Technologie Supérieure

Membre Club Capra 
VP-Communication - CS Games  2014
Jeux de Génie  2011 à 2014
Magistrat Fraternité du Piranha 
Comité Organisateur Olympiades ÉTS 2012
Compétition Québécoise d'Ingénierie 2012 - Compétition Senior

On Thu, Nov 6, 2014 at 12:30 PM, Jason Rosenberg  wrote:

> Hi Francois,
>
> We had the exact same problem.  We embed Kafka in our service container,
> and we use yammer metrics to see data about the whole app (e.g. kafka, the
> jvm, the service container wrapping it).  However, as you observed, by
> default, kafka produces an insane amount of metrics.  So what we did, is
> using the yammer library, you can disable specific metrics by removing
> metrics from the yammer MetricsRegistry, which you can access from guice
> (if you are using guice).  I implemented the MetricsRegistryListener, and
> added the ability to remove metric names by regex, so I can still have some
> metrics show up (like the simple 'AllTopic' counts for messages/bytes sent
> from the producer), but block everything else that's per topic, etc
>
> Jason
>
> On Fri, Sep 19, 2014 at 11:34 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi,
> >
> > I don't have any source or configs handy to check things, but you are
> > saying you've configured Kafka to use GraphiteReporter, right?  So why
> not
> > remove that config, so metrics stop being sent to Graphite if your
> Graphite
> > setup is suffering?  If you do that and you still want to see your Kafka
> > metrics, you can always use SPM  for Kafka
> > (though some of the graphs will be empty until we get KAFKA-1481, or
> > something else that improves metrics, in).  If you just want to use it
> > temporarily, just use the free 30-day trial version until you beef up
> your
> > Graphite setup.
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Fri, Sep 19, 2014 at 10:08 AM, François Langelier <
> > f.langel...@gmail.com>
> > wrote:
> >
> > > Hi Daniel,
> > >
> > > Thank you for your answer.
> > >
> > > It's possible that I didn't understood something, if so correct me
> > please.
> > >
> > > From what I understood, from the kafka doc #monitoring
> > > , kafka use
> > Yammer
> > > Metrics for monitoring the servers (the brokers) and the clients
> > (producers
> > > and consumers).
> > >
> > > Our web site is also using Yammer Metrics and push that to our Graphite
> > > server and our web site also produce message in kafka.
> > > From what I read, the Yammer Metrics GraphiteReporter is a kind of
> > > Singleton, once I Enable it, it is working for all the process. (But I
> > > might be wrong here...)
> > >
> > > We recently upgrade kafka from 0.7.2 to 0.8.1.1 and since the upgrade,
> > > kafka is monitoring in our Graphite Server and is hammering it, so we
> > > aren't able to use it because we always get timeout...
> > >
> > > SO, I was wondering if there is a way to disable the kafka monitoring
> to
> > > our Graphite server.
> > >
> > > We are using the code in the tag 0.8.1.1 on github, so if the
> > kafka-ganglia
> > > isn't in the tag, we aren't using it :)
> > >
> > >
> > > François Langelier
> > > Étudiant en génie Logiciel - École de Technologie Supérieure
> > > 
> > > Capitaine Club Capra 
> > > VP-Communication - CS Games  2014
> > > Jeux de Génie  2011 à 2014
> > > Magistrat Fraternité du Piranha 
> > > Comité Organisateur Olympiades ÉTS 2012
> > > Compétition Québécoise d'Ingénierie 2012 - Compétition Senior
> > >
> > > On Wed, Sep 17, 2014 at 11:05 PM, Daniel Compton <
> d...@danielcompton.net
> > >
> > > wrote:
> > >
> > > > Hi Francois
> > > >
> > > > I didn't quite understand how you've set up your metrics reporting.
> Are
> > > you
> > > > using the https://github.com/criteo/kafka-ganglia metrics reporter?
> If
> > > so
> > > > then you should be able to adjust the config to exclude the metrics
> you
> > > > don't want, with kafka.ganglia.metrics.exclude.regex.
> > > >
> > > >
> > > > On 18 September 2014 07:55, François Langelier <
> f.langel...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi all!
> > > > >
> > > > > We are using yammer metrics to monitor some parts of our system.
> > > > >
> > > > > Since we upgrade from kafka 0.7.2 to 0.8.1.1, we saw a lot more
> data
> > > > > getting in our graphite server and from what I saw, it looks like
> it
> > > all
> > > > > come from our producers.
> > > > >
> > > > > From what i understand, since we already use graphite, ou

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-07 Thread Mohit Kathuria
Hi all,

Can someone help here. We are getting constant rebalance failure each time
a consumer is added beyond a certain number. Did quite a lot of debugging
on this and still not able to figure out the pattern.

-Thanks,
Mohit

On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria 
wrote:

> Neha,
>
> Looks like an issue with the consumer rebalance not able to complete
> successfully. We were able to reproduce the issue on topic with 30
> partitions,  3 consumer processes(p1,p2 and p3), properties -  40
> rebalance.max.retries and 1(10s) rebalance.backoff.ms.
>
> Before the process p3 was started, partition ownership was as expected:
>
> partitions 0-14 owned by p1
> partitions 15-29 -> owner p2
>
> As the process p3 started, rebalance was triggered. Process p3 was
> successfully able to acquire partition ownership for partitions 20-29 as
> expected as per the rebalance algorithm. However, process p2 while trying
> to acquire ownership of partitions 10-19 saw rebalance failure after 40
> retries.
>
> Attaching the logs from process p2 and process p1. It says that p2 was
> attempting to rebalance, it was trying to acquire ownership of partitions
> 10-14 which were owned by process p1. However, at the same time process p1
> did not get any event for giving up the partition ownership for partitions
> 1-14.
> We were expecting a rebalance to have triggered in p1 - but it didn't and
> hence not giving up ownership. Is our assumption correct/incorrect?
> And if the rebalance gets triggered in p1 - how to figure out apart from
> logs as the logs on p1 did not have anything.
>
> *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
> [topic_consumerIdString], waiting for the partition ownership to be
> deleted: 11*
>
> During and after the rebalance failed on process p2, Partition Ownership
> was as below:
> 0-14 -> owner p1
> 15-19 -> none
> 20-29 -> owner p3
>
> This left the consumers in inconsistent state as 5 partitions were never
> consumer from and neither was the partitions ownership balanced.
>
> However, there was no conflict in creating the ephemeral node which was
> the case last time. Just to note that the ephemeral node conflict which we
> were seeing earlier also appeared after rebalance failed. My hunch is that
> fixing the rebalance failure will fix that issue as well.
>
> -Thanks,
> Mohit
>
>
>
> On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede 
> wrote:
>
>> Mohit,
>>
>> I wonder if it is related to
>> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires
>> a
>> session, it doesn't delete the ephemeral nodes immediately. So if you end
>> up trying to recreate ephemeral nodes quickly, it could either be in the
>> valid latest session or from the previously expired session. If you hit
>> this problem, then waiting would resolve it. But if not, then this may be
>> a
>> legitimate bug in ZK 3.4.6.
>>
>> Can you try shutting down all your consumers, waiting until session
>> timeout
>> and restarting them?
>>
>> Thanks,
>> Neha
>>
>> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria 
>> wrote:
>>
>> > Dear Experts,
>> >
>> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
>> > topic with 30 partitions and 2 replicas. We are using High level
>> consumer
>> > api.
>> > Each consumer process which is a storm topolofy has 5 streams which
>> > connects to 1 or more partitions. We are not using storm's inbuilt kafka
>> > spout. Everything runs fine till the 5th consumer process(25 streams) is
>> > added for this topic.
>> >
>> > As soon as the sixth consumer process is added, the newly added
>> partition
>> > does not get the ownership of the partitions that it requests for as the
>> > already existing owners have not yet given up the ownership.
>> >
>> > We changed certain properties on consumer :
>> >
>> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
>> > rebalance.max.retries >> zk connection timeout)
>> > 2. Back off ms between rebalances - 1 (10seconds)
>> > 3. ZK connection timeout - 100,000 (100 seconds)
>> >
>> > Although when I am looking in the zookeeper shell when the rebalance is
>> > happening, the consumer is registered fine on the zookeeper. Just that
>> the
>> > rebalance does not happen.
>> > After the 20th rebalance gets completed, we get
>> >
>> >
>> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
>> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
>> > offsets after clearing the fetcher queues*
>> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
>> > exception while trying to start streamer threads:
>> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
>> after
>> > 20 retries*
>> > *kafka.common.ConsumerRebalanceFailedException:
>> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
>> after
>> > 20 retries*
>> > *at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumer