Producer TimeoutException while accessing with domain and L3 Load balancer.

2017-11-28 Thread Madhukar Bharti
Hi,

We have Kafka cluster with three brokers(0.10.0.1). We are accessing this
cluster using domain name say common.test. Also, L3 load balancer has been
configured for this domain name, so that request will be passed to brokers
in RR way.

The Producer has been implemented in java client(KafkaProducer), with
configurations:  rerties=3, max.in.flight.requests.per.connection=1.

In normal load ~1.5 M messages/day, we didn't face any exceptions. But we
are receiving below TimeoutException if load increases(~2.5M message/day)
randomly and in total ~100 Exceptions.


> *java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Batch containing 1
> record(s) expired due to timeout while requesting metadata from brokers for
> Test-7 at *
>
> *org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
> at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
> at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> at com.zoho.mqueue.producer.KafkaSender.sendToKafka(KafkaSender.java:89)
> at *
>

If we use broker IPs directly to producer configuration, no exception
comes. How to solve this so that no message will be lost?


Regards,
Madhukar


Re: How to Downgrade Kafka from 0.11.0.1 to 0.10.0.1

2017-11-06 Thread Madhukar Bharti
Hi Matt,

We are getting below exception while updating with old Kafka build(0.10.0.1)

kafka.network.InvalidRequestException: Error getting request for apiKey: 6
and apiVersion: 3
at
kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
at
kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid version for API key
6: 3


Above log is keep on printing.

Regards,
Madhukar




On Mon, Nov 6, 2017 at 2:05 AM, Matt Farmer  wrote:

> If you could provide more details about the kind of issues you saw when you
> downgraded (e.g. error logs, behavior, etc) it might help folks help you.
> At the moment, I wouldn't know where to begin with the issue you've
> described as there's not a ton of detail about what, exactly, went wrong.
>
> On Sun, Nov 5, 2017 at 12:57 PM Madhukar Bharti 
> wrote:
>
> > Hi all,
> >
> > Recently we have upgraded our cluster from 0.10.0.1 to 0.11.0.1. After
> few
> > hours of upgrade we faced issue of too many closed_wait connection.
> Latter
> > got to know that this is a bug and got fixed in 1.0.0.
> >
> > As of now we can’t take this as there is still possible known bugs are
> > there, we need to test it well putting in production.
> >
> > Can you please help us how to downgrade from 0.11.0.1 to 0.10.0.1.? As of
> > now we have message format as 0.10.0.1 in 0.11.0.1 brokers. All clients
> are
> > also on 0.10.0.1.
> >
> > I tried like below,
> >
> > Set inter broker protocol version to 0.10.0.1.
> > Restart brokers one by one.
> > Re-install with Kafka 0.10.0.1.
> >
> > But it dint worked properly for all partitions.
> >
> >
> > Regards,
> > Madhukar
> >
> >
> > --
> > Thanks and Regards,
> > Madhukar Bharti
> > Mob: 7845755539
> >
>



-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


How to Downgrade Kafka from 0.11.0.1 to 0.10.0.1

2017-11-05 Thread Madhukar Bharti
Hi all,

Recently we have upgraded our cluster from 0.10.0.1 to 0.11.0.1. After few
hours of upgrade we faced issue of too many closed_wait connection. Latter
got to know that this is a bug and got fixed in 1.0.0.

As of now we can’t take this as there is still possible known bugs are
there, we need to test it well putting in production.

Can you please help us how to downgrade from 0.11.0.1 to 0.10.0.1.? As of
now we have message format as 0.10.0.1 in 0.11.0.1 brokers. All clients are
also on 0.10.0.1.

I tried like below,

Set inter broker protocol version to 0.10.0.1.
Restart brokers one by one.
Re-install with Kafka 0.10.0.1.

But it dint worked properly for all partitions.


Regards,
Madhukar


-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Re: Kafka 0.11 broker running out of file descriptors

2017-11-02 Thread Madhukar Bharti
Hi,

We also facing the same issue when we upgraded our cluste to 0.11.0.1. Can
anyone please help us out. It happened in production cluster.

Thanks
Madhukar

On Fri, 15 Sep 2017 at 9:07 PM, Lukas Lalinsky 
wrote:

> Hello,
>
> I'm dealing with a strange issue in production and I'm running out of
> options what to do about it.
>
> It's a 3 node cluster running Kafka 0.11.0.1 with most topics having
> replication factor of 2. At some point, the broker that is about do
> die shrinks ISR for a few partitions just to itself:
>
> [2017-09-15 11:25:29,104] INFO Partition [...,12] on broker 3:
> Shrinking ISR from 3,2 to 3 (kafka.cluster.Partition)
> [2017-09-15 11:25:29,107] INFO Partition [...,8] on broker 3:
> Shrinking ISR from 3,1 to 3 (kafka.cluster.Partition)
> [2017-09-15 11:25:29,108] INFO Partition [...,38] on broker 3:
> Shrinking ISR from 3,2 to 3 (kafka.cluster.Partition)
>
> Then slightly after that, another broker writes errors like this to
> the log file:
>
> [2017-09-15 11:25:45,536] WARN [ReplicaFetcherThread-0-3]: Error in
> fetch to broker 3, request (type=FetchRequest, replicaId=2,
> maxWait=500, minBytes=1, maxBytes=10485760, fetchData={...})
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the
> response was read
> at
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
>
> There are many of such messages. At that point, I see the number of
> open file descriptors on the other broker growing. And eventually it
> crashes with thousands of messages like this:
>
> [2017-09-15 11:31:23,273] ERROR Error while accepting connection
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch
> .ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch
> .ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:337)
> at kafka.network.Acceptor.run(SocketServer.scala:280)
> at java.lang.Thread.run(Thread.java:745)
>
> The file descriptor limit is set to 128k, the number of open file
> descriptors during normal operation is about 8k, so there is a lot of
> headroom.
>
> I'm not sure if it's the other brokers trying to replicate that kills
> it, or whether it's clients trying to publish messages.
>
> Has anyone seen a behavior like this? I'd appreciate any pointers.
>
> Thanks,
>
> Lukas
>
-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Doubts regarding KafkaProducer implemetation

2017-03-13 Thread Madhukar Bharti
Hi,

We have three brokers in a cluster with replication factor is 3. We are
using Kafka-0.10.0.1. We see some failures on metadata timeout exceptions
while producing.
We have configured retries=3 and max in flight request=1.
After comparing with the old scala Producer code found that in new Producer
"retries" is not working if there is a failure on meta-data refresh so
an exception is throwing to caller API.

But in older Producer, it was retrying based on configured values and
throwing exception only after all retries completed. Can we achieve this in
new Producer? As we don't see any exception like *failed to produce after 3
retries*.



Regards,
Madhukar Bharti


Re: 0.8.2.1 Client not able to connect with Kafka 0.10.0.1 cluster even the cluster has message format version 0.8.2.

2016-11-02 Thread Madhukar Bharti
Hi,

After checking we found that there was an issue with version id passed in
ConsumerMetadataRequest after setting it to
0(ConsumerMetadataRequest.currentVersion()). It started working!

Thanks!
Madhukar

On Tue, Nov 1, 2016 at 10:29 PM, Madhukar Bharti 
wrote:

> Hi Ismael,
>
> Below is the stack trace from client.
>
> java.io.EOFException: Received -1 when reading from channel, socket has
>>> likely been closed.
>>
>> at kafka.utils.Utils$.read(Utils.scala:381)
>>
>> at kafka.network.BoundedByteBufferReceive.readFrom(
>>> BoundedByteBufferReceive.scala:54)
>>
>> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>
>> at kafka.network.BoundedByteBufferReceive.readCompletely(
>>> BoundedByteBufferReceive.scala:29)
>>
>> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>>
>>
> This we are getting while reading from BlockingChannel after sending
> ConsumerMetadataRequest.
> Sample code is here
> <https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetHandler.java>
> .
>
> Regards,
> Madhukar
>
>
> On Tue, Nov 1, 2016 at 9:51 PM, Ismael Juma  wrote:
>
>> OffsetFetchRequest should have api key = 9, but for some reason your
>> broker
>> is receiving a request with api key = 10 (GroupCoordinatorRequest). Can
>> you
>> provide the stacktrace from the client as well?
>>
>> Ismael
>>
>> On Tue, Nov 1, 2016 at 12:13 PM, Madhukar Bharti <
>> bhartimadhu...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > We have upgraded our cluster from 0.8.2.1 to 0.10.0.1. Now
>> > *log.message.format.version* is set to *0.8.2. *
>> > We are still using Kafka 0.8.2.1 jars in client side, when we tried to
>> > fetch consumer offset using BlockingChannel and OffsetFetchRequest as in
>> > example here
>> > <https://cwiki.apache.org/confluence/display/KAFKA/
>> > Committing+and+fetching+consumer+offsets+in+Kafka>,
>> > we are getting below exceptions in brokers.
>> >
>> > ERROR Closing socket for 172.19.8.200:9092-192.168.1.1:43682 because of
>> > > error (kafka.network.Processor)
>> > > kafka.network.InvalidRequestException: Error getting request for
>> apiKey:
>> > > 10 and apiVersion: 1
>> > > at
>> > > kafka.network.RequestChannel$Request.liftedTree2$1(
>> > RequestChannel.scala:95)
>> > > at
>> > > kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
>> > > at
>> > > kafka.network.Processor$$anonfun$processCompletedReceives$1.
>> > apply(SocketServer.scala:488)
>> > > at
>> > > kafka.network.Processor$$anonfun$processCompletedReceives$1.
>> > apply(SocketServer.scala:483)
>> > > at scala.collection.Iterator$clas
>> s.foreach(Iterator.scala:893)
>> > > at scala.collection.AbstractIterator.foreach(
>> > Iterator.scala:1336)
>> > > at
>> > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> > > at scala.collection.AbstractItera
>> ble.foreach(Iterable.scala:54)
>> > > at
>> > > kafka.network.Processor.processCompletedReceives(SocketServe
>> r.scala:483)
>> > > at kafka.network.Processor.run(SocketServer.scala:413)
>> > > at java.lang.Thread.run(Thread.java:745)
>> > > Caused by: java.lang.IllegalArgumentException: Invalid version for
>> API
>> > key
>> > > 10: 1
>> > > at
>> > > org.apache.kafka.common.protocol.ProtoUtils.schemaFor(
>> > ProtoUtils.java:31)
>> > > at
>> > > org.apache.kafka.common.protocol.ProtoUtils.
>> > requestSchema(ProtoUtils.java:44)
>> > > at
>> > > org.apache.kafka.common.protocol.ProtoUtils.
>> > parseRequest(ProtoUtils.java:60)
>> > > at
>> > > org.apache.kafka.common.requests.GroupCoordinatorRequest.parse(
>> > GroupCoordinatorRequest.java:59)
>> > > at
>> > > org.apache.kafka.common.requests.AbstractRequest.
>> > getRequest(AbstractRequest.java:54)
>> > > at
>> > > kafka.network.RequestChannel$Request.liftedTree2$1(
>> > RequestChannel.scala:92)
>> > > ... 10 more
>> > >
>> >
>> > But, the same code is working fine with Kafka 0.8.2.1 cluster.
>> > I am aware of some protocol changes has been made in Kafka-0.10.X.X but
>> > don't want to update our client to 0.10.0.1 as of now. Is there any way
>> > without updating client the same code will still give the consumer
>> offset.
>> >
>> >
>> >
>> >
>> > --
>> > Thanks and Regards,
>> > Madhukar Bharti
>> > Mob: 7845755539
>> >
>>
>
>


Re: 0.8.2.1 Client not able to connect with Kafka 0.10.0.1 cluster even the cluster has message format version 0.8.2.

2016-11-01 Thread Madhukar Bharti
Hi Ismael,

Below is the stack trace from client.

java.io.EOFException: Received -1 when reading from channel, socket has
>> likely been closed.
>
> at kafka.utils.Utils$.read(Utils.scala:381)
>
> at
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>
>
This we are getting while reading from BlockingChannel after sending
ConsumerMetadataRequest.
Sample code is here
<https://github.com/madhukarbharti/kafka-8.2.1-test/blob/master/src/com/bharti/kafka/offset/OffsetHandler.java>
.

Regards,
Madhukar


On Tue, Nov 1, 2016 at 9:51 PM, Ismael Juma  wrote:

> OffsetFetchRequest should have api key = 9, but for some reason your broker
> is receiving a request with api key = 10 (GroupCoordinatorRequest). Can you
> provide the stacktrace from the client as well?
>
> Ismael
>
> On Tue, Nov 1, 2016 at 12:13 PM, Madhukar Bharti  >
> wrote:
>
> > Hi,
> >
> > We have upgraded our cluster from 0.8.2.1 to 0.10.0.1. Now
> > *log.message.format.version* is set to *0.8.2. *
> > We are still using Kafka 0.8.2.1 jars in client side, when we tried to
> > fetch consumer offset using BlockingChannel and OffsetFetchRequest as in
> > example here
> > <https://cwiki.apache.org/confluence/display/KAFKA/
> > Committing+and+fetching+consumer+offsets+in+Kafka>,
> > we are getting below exceptions in brokers.
> >
> > ERROR Closing socket for 172.19.8.200:9092-192.168.1.1:43682 because of
> > > error (kafka.network.Processor)
> > > kafka.network.InvalidRequestException: Error getting request for
> apiKey:
> > > 10 and apiVersion: 1
> > > at
> > > kafka.network.RequestChannel$Request.liftedTree2$1(
> > RequestChannel.scala:95)
> > > at
> > > kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
> > > at
> > > kafka.network.Processor$$anonfun$processCompletedReceives$1.
> > apply(SocketServer.scala:488)
> > > at
> > > kafka.network.Processor$$anonfun$processCompletedReceives$1.
> > apply(SocketServer.scala:483)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > > at scala.collection.AbstractIterator.foreach(
> > Iterator.scala:1336)
> > > at
> > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > at scala.collection.AbstractIterable.foreach(
> Iterable.scala:54)
> > > at
> > > kafka.network.Processor.processCompletedReceives(
> SocketServer.scala:483)
> > > at kafka.network.Processor.run(SocketServer.scala:413)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.IllegalArgumentException: Invalid version for API
> > key
> > > 10: 1
> > > at
> > > org.apache.kafka.common.protocol.ProtoUtils.schemaFor(
> > ProtoUtils.java:31)
> > > at
> > > org.apache.kafka.common.protocol.ProtoUtils.
> > requestSchema(ProtoUtils.java:44)
> > > at
> > > org.apache.kafka.common.protocol.ProtoUtils.
> > parseRequest(ProtoUtils.java:60)
> > > at
> > > org.apache.kafka.common.requests.GroupCoordinatorRequest.parse(
> > GroupCoordinatorRequest.java:59)
> > > at
> > > org.apache.kafka.common.requests.AbstractRequest.
> > getRequest(AbstractRequest.java:54)
> > > at
> > > kafka.network.RequestChannel$Request.liftedTree2$1(
> > RequestChannel.scala:92)
> > > ... 10 more
> > >
> >
> > But, the same code is working fine with Kafka 0.8.2.1 cluster.
> > I am aware of some protocol changes has been made in Kafka-0.10.X.X but
> > don't want to update our client to 0.10.0.1 as of now. Is there any way
> > without updating client the same code will still give the consumer
> offset.
> >
> >
> >
> >
> > --
> > Thanks and Regards,
> > Madhukar Bharti
> > Mob: 7845755539
> >
>


0.8.2.1 Client not able to connect with Kafka 0.10.0.1 cluster even the cluster has message format version 0.8.2.

2016-11-01 Thread Madhukar Bharti
Hi,

We have upgraded our cluster from 0.8.2.1 to 0.10.0.1. Now
*log.message.format.version* is set to *0.8.2. *
We are still using Kafka 0.8.2.1 jars in client side, when we tried to
fetch consumer offset using BlockingChannel and OffsetFetchRequest as in
example here
<https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka>,
we are getting below exceptions in brokers.

ERROR Closing socket for 172.19.8.200:9092-192.168.1.1:43682 because of
> error (kafka.network.Processor)
> kafka.network.InvalidRequestException: Error getting request for apiKey:
> 10 and apiVersion: 1
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:95)
> at
> kafka.network.RequestChannel$Request.(RequestChannel.scala:87)
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:488)
> at
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:483)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:483)
> at kafka.network.Processor.run(SocketServer.scala:413)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalArgumentException: Invalid version for API key
> 10: 1
> at
> org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
> at
> org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:44)
> at
> org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:60)
> at
> org.apache.kafka.common.requests.GroupCoordinatorRequest.parse(GroupCoordinatorRequest.java:59)
> at
> org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:54)
> at
> kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:92)
> ... 10 more
>

But, the same code is working fine with Kafka 0.8.2.1 cluster.
I am aware of some protocol changes has been made in Kafka-0.10.X.X but
don't want to update our client to 0.10.0.1 as of now. Is there any way
without updating client the same code will still give the consumer offset.




-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Re: Data loss in Kafka.

2016-06-22 Thread Madhukar Bharti
Hi Dustin,

Thanks for your quick reply.
Yes we didn't set *unclear.leader.election.enable* property so it is taking*
true *by default. After setting it to *false* and repeating the same we
observed that, once at least one broker from ISR come back to life, then
only leader election happens and no message loss is achieved.

Regards,
Madhukar


On Wed, Jun 22, 2016 at 8:28 PM, Dustin Cote  wrote:

> Hi Madhukar,
>
> It looks like you've had an unclean leader election in this case.  Have a
> look at the documentation for unclean.leader.election.enable and set it to
> false if you'd like to try and avoid data loss in this scenario.  By
> default it is set to true.
>
> Reference: http://kafka.apache.org/documentation.html#design_uncleanleader
>
> Regards,
>
> On Wed, Jun 22, 2016 at 10:49 AM, Madhukar Bharti <
> bhartimadhu...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > We have seen data loss in Kafka,  once we restarted the cluster.
> >
> > We have 3 Brokers in a cluster. In order to prevent data loss we have
> > configured *min.insync.replicas=2* for all topics.
> > Initially, when all brokers(1,2,3) are live we produced few messages(say
> 50
> > messages). Now we killed broker 1 and started producing the messages,
> after
> > 20 messages were produced, we killed broker 2. Since number of brokers in
> > ISR are less than 2, producer requests failed(as expected).
> > After this, We killed broker 3, so at this point all brokers are down.
> >
> > Now, we started broker 1, which has 50 messages. Broker 1 became leader
> of
> > all partition it owned. Since brokers in ISR are less than 2,  producer
> > request failed.  After this, We started broker 2, the logs got truncated
> at
> > broker 2 and set offset to 50. *Here we have lost those 20 messages.*
> But,
> > in this case broker 1 should release the leadership and becomes follower
> > in-order to get those 20 messages so that no data would be lost.
> >
> > Is there any suggestion to prevent data loss in this case?
> >
> > Note: We have tested the same case in 0.8.2.1 and 0.10 release.
> >
> >
> > Thanks and Regards,
> > Madhukar
> >
>
>
>
> --
> Dustin Cote
> confluent.io
>


Re: Kafka broker crash

2016-06-22 Thread Madhukar Bharti
Hi Rahul,

Whether the path is  "/tmp/kafka-logs/" or "/temp/kafka-logs" ?

Mostly if path is set to "/tmp/" then in case machine restart it may delete
the files. So it is throwing FileNotFoundException.
you can change the file location to some other path and restart all broker.
This might fix the issue.

Regrads,
Madhukar

On Wed, Jun 22, 2016 at 1:40 PM, Misra, Rahul 
wrote:

> Hi,
>
> I'm facing a strange issue in my Kafka cluster. Could anybody please help
> me with it. The issue is as follows:
>
> We have a 3 node kafka cluster. We installed the zookeeper separately and
> have pointed the brokers to it. The zookeeper is also 3 node, but for our
> POC setup, the zookeeper nodes are on the same machines as the Kafka
> brokers.
>
> While receiving messages from an existing topic using a new groupId, 2 of
> the brokers crashed with same FATAL errors:
>
> 
> < [server 2 logs] >>>
>
> [2016-06-21 23:09:14,697] INFO [GroupCoordinator 1]: Stabilized group
> pocTestNew11 generation 1 (kafka.coordinator.Gro
> upCoordinator)
> [2016-06-21 23:09:15,006] INFO [GroupCoordinator 1]: Assignment received
> from leader for group pocTestNew11 for genera
> tion 1 (kafka.coordinator.GroupCoordinator)
> [2016-06-21 23:09:20,335] FATAL [Replica Manager on Broker 1]: Halting due
> to unrecoverable I/O error while handling p
> roduce request:  (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log
> '__consumer_offsets-4'
> at kafka.log.Log.append(Log.scala:318)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
> at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
> at
> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
> at
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
> at
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:116)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
> at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
> at
> kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:228)
> at
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:429)
> at
> kafka.coordinator.GroupCoordinator$$anonfun$handleCommitOffsets$9.apply(GroupCoordinator.scala:429)
> at scala.Option.foreach(Option.scala:257)
> at
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:429)
> at
> kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:280)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:76)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
> /tmp/kafka-logs/__consumer_offsets-4/.index (No such
> file or directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
> at
> kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at
> kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
> at kafka.log.Log.roll(Log.scala:627)
> at kafka.log.Log.maybeRoll(Log.scala:602)
> at kafka.log.Log.append(Log.scala:357)
>
> --
> < [server 3 logs] >>>
>
> [2016-06-21 23:08:49,796] FATAL [ReplicaFetcherThread-0-0], Disk error
> while replicating data. (kafka.server.ReplicaFe
> tcherThread)
> kafka.common.KafkaS

Data loss in Kafka.

2016-06-22 Thread Madhukar Bharti
Hi All,

We have seen data loss in Kafka,  once we restarted the cluster.

We have 3 Brokers in a cluster. In order to prevent data loss we have
configured *min.insync.replicas=2* for all topics.
Initially, when all brokers(1,2,3) are live we produced few messages(say 50
messages). Now we killed broker 1 and started producing the messages, after
20 messages were produced, we killed broker 2. Since number of brokers in
ISR are less than 2, producer requests failed(as expected).
After this, We killed broker 3, so at this point all brokers are down.

Now, we started broker 1, which has 50 messages. Broker 1 became leader of
all partition it owned. Since brokers in ISR are less than 2,  producer
request failed.  After this, We started broker 2, the logs got truncated at
broker 2 and set offset to 50. *Here we have lost those 20 messages.* But,
in this case broker 1 should release the leadership and becomes follower
in-order to get those 20 messages so that no data would be lost.

Is there any suggestion to prevent data loss in this case?

Note: We have tested the same case in 0.8.2.1 and 0.10 release.


Thanks and Regards,
Madhukar


Re: __consumer_offsets Topic files are not getting deleted after log retention hrs

2016-01-01 Thread Madhukar Bharti
Thanks a lot Guozhnag, Maxim.

I haven't set "offsets.retention.minutes" and "offsets.topic.segment.bytes",
both are defaults only but we have "log.cleaner.enable" is set to false.
that might be the reason. We will set as mentioned and test.

Thanks again!!

On Fri, Jan 1, 2016 at 1:32 AM, Maxim Vladimirsky  wrote:

> Make sure that you have *log.cleaner.enable* set to *true* (it is false by
> default). Check this KAFKA-2988
> <https://issues.apache.org/jira/browse/KAFKA-2988> out for details.
>
>
> On Thu, Dec 31, 2015 at 11:13 AM, Guozhang Wang 
> wrote:
>
> > There is a special config for that topic named
> "offsets.retention.minutes"
> > besides the "log.retention.hours", and similarly a different
> > "offsets.topic.segment.bytes" for that topic as well. Could you check
> what
> > are the values?
> >
> > Note that only old log segments will be considered for deletion so if
> your
> > __consumer_offsets topic only have one segment file, it will not be
> > deleted.
> >
> > Guozhang
> >
> >
> > On Wed, Dec 30, 2015 at 4:30 AM, Madhukar Bharti <
> bhartimadhu...@gmail.com
> > >
> > wrote:
> >
> > > Dear Team,
> > >
> > > We are using Kafka-0.8.2.1 and having log.retention.hours=168 but files
> > of
> > > __consumer_offsets are not getting deleted, due to this lots of disc
> > spaces
> > > are used.
> > >
> > > Please help how to delete file of offset storage topic after specified
> > > time.
> > >
> > >
> > > Thanks and Regards,
> > > Madhukar
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
Thanks and Regards,
Madhukar


__consumer_offsets Topic files are not getting deleted after log retention hrs

2015-12-30 Thread Madhukar Bharti
Dear Team,

We are using Kafka-0.8.2.1 and having log.retention.hours=168 but files of
__consumer_offsets are not getting deleted, due to this lots of disc spaces
are used.

Please help how to delete file of offset storage topic after specified time.


Thanks and Regards,
Madhukar


Zookeeper offset commit/Consumer Re-balance issues

2015-11-22 Thread Madhukar Bharti
Hi,

Recently we started facing some issue in our stage/production setup, where
we are running 3 brokers in a cluster and ~800 partitions with 130 Topics.

Most of the topics has High level consumers. In total more than 200
Consumers groups are running and listening for the topics.

1. We stared getting problems like, After sometime consumer stopped
consuming from the partitions in some group. Once we restarted it started
consuming but lag was showing too much(which was more than before restart).
We are running consumer in multiple servers.(Took the thread dump
ConsumerThreads were alive).

Even sometime after restarting consumer A for the topic which has 1
partition the consumer show like "No broker partition available". Using
*ConsumerOffsetChecker* tools also it was showing no owner for the group.
Once we looking into Zookeepers consumer node it shows some other machines
consumers are also registered in /consumers//id and that znode time
was also up-to-date. After kill that consumer B only A started getting
message.

** How ConsumerOffsetChecker was not showing even though some consumer is
alive and using zknode :(
** is Zookeeper is not able to handle the re-balances because of 200
consumers?

2. After restart the lag was more than the previous.

** Why committed offset values also get changed(as lag was increased after
restart)?

Do any one of you also faced the same issue? Please give some suggestion to
bring it back to stable state.


Note: we have checked the FAQ and set the condition properly for re-balance
issue. We are using 0.8.2.1.


Regards,
Madhukar


Kafka Async producer throwing Exception before configured timeout.

2015-10-12 Thread Madhukar Bharti
Hi ,

We are getting below exception frequently even, we have configured timeout
as 1ms.
Does anyone also has faced this issue.

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata
> after 28 ms



We are using Kafka-0.8.2.1 and producer configurations are:


> message.send.max.retries=3
> send.buffer.bytes=4048
> metadata.fetch.timeout.ms=1
> topic.metadata.refresh.interval.ms=1
> metadata.max.age.ms=1




Thanks and Regards,
Madhukar


Re: What can be reason for fetcher thread for slow response.

2015-09-13 Thread Madhukar Bharti
Hi Erik & Prabhjot

We are using Kafka-0.8.2.1 and old producer API with below config:

request.required.acks=1
request.timeout.ms=2000
producer.type=sync

On Kafka broker we are having:

num.network.threads=8
num.io.threads=10
num.replica.fetchers=4
replica.fetch.max.bytes=2097154
replica.fetch.wait.max.ms=500
replica.socket.timeout.ms=6
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=1
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=100

If you are asking about Singleton in terms of Producer then, we have
created pool of producers that has equal no of Producers and connection
that can be made in tomcat.


Thanks and Regards,
Madhukar

On Fri, Sep 11, 2015 at 8:27 PM, Prabhjot Bharaj 
wrote:

> Hi,
>
> In addition to the parameters asked by Erik, it would be great if you could
> share your broker's server.properties as well
>
> Regards,
> Prabhjot
>
> On Fri, Sep 11, 2015 at 8:10 PM, Helleren, Erik <
> erik.helle...@cmegroup.com>
> wrote:
>
> > Hi Madhukar,
> > Some questions that can help understand whats going on: Which kafka
> > version is used?  Which Producer API is being used
> > (http://kafka.apache.org/documentation.html#producerapi)?  And what are
> > the configs for this producer?
> >
> > Also, because I know little about tomcat, is there a semantic for a
> > singleton, or a server singleton?
> > -Erik
> >
> > On 9/11/15, 8:48 AM, "Madhukar Bharti"  wrote:
> >
> > >Hi,
> > >
> > >
> > >We are having 3 brokers in a cluster. Producer request is getting failed
> > >for broker 2. We are frequently getting below exception:
> > >
> > >15/09/09 22:09:06 WARN async.DefaultEventHandler: Failed to send
> > >producer request with* correlation id 1455 to broker 2* with data for
> > >partitions [UserEvents,0]
> > >> java.net.SocketTimeoutException
> > >>  at
> > >>sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
> > >>  at
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > >>  at
> >
> >>java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385
> > >>)
> > >>  at kafka.utils.Utils$.read(Utils.scala:375)
> > >>  at
> >
> >>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.
> > >>scala:54)
> > >>  at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > >>  at
> >
> >>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferRe
> > >>ceive.scala:29)
> > >>  at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > >>
> > >>
> > >After looking into request-logs in all machines, found that there is
> some
> > >slowness in broker 2. I am listing top 20 request processing time from
> all
> > >the brokers.
> > >
> > >Broker 1
> > >
> > >  Broker 2
> > >
> > >   Broker 3
> > >
> > >Producer&
> > >
> > >Fetcher
> > >
> > >Producer
> > >
> > >Producer + Fetcher
> > >
> > >Producer
> > >
> > >Producer +  Fetcher
> > >
> > >Producer
> > >
> > >493
> > >
> > >494
> > >
> > >495
> > >
> > >496
> > >
> > >497
> > >
> > >498
> > >
> > >499
> > >
> > >500
> > >
> > >501
> > >
> > >502
> > >
> > >503
> > >
> > >504
> > >
> > >519
> > >
> > >520
> > >
> > >541
> > >
> > >542
> > >
> > >545
> > >
> > >551
> > >
> > >577
> > >
> > >633
> > >
> > >77
> > >
> > >91
> > >
> > >94
> > >
> > >96
> > >
> > >104
> > >
> > >111
> > >
> > >112
> > >
> > >153
> > >
> > >167
> > >
> > >184
> > >
> > >248
> > >
> > >249
> > >
> > >254
> > >
> > >284
> > >
> > >395
> > >
> > >443
> > >
> > >470
> > >
> > >551
> > >
> > >577
> > >
> &

What can be reason for fetcher thread for slow response.

2015-09-11 Thread Madhukar Bharti
Hi,


We are having 3 brokers in a cluster. Producer request is getting failed
for broker 2. We are frequently getting below exception:

15/09/09 22:09:06 WARN async.DefaultEventHandler: Failed to send
producer request with* correlation id 1455 to broker 2* with data for
partitions [UserEvents,0]
> java.net.SocketTimeoutException
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>   at kafka.utils.Utils$.read(Utils.scala:375)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>
>
After looking into request-logs in all machines, found that there is some
slowness in broker 2. I am listing top 20 request processing time from all
the brokers.

Broker 1

  Broker 2

   Broker 3

Producer&

Fetcher

Producer

Producer + Fetcher

Producer

Producer +  Fetcher

Producer

493

494

495

496

497

498

499

500

501

502

503

504

519

520

541

542

545

551

577

633

77

91

94

96

104

111

112

153

167

184

248

249

254

284

395

443

470

551

577

633

1033

1034

1035

1036

1037

1038

1039

1040

1042

1043

1044

1049

1051

1057

1064

1087

1145

1146

1466

1467

85

86

114

121

123

136

153

201

225

226

240

299

405

406

448

449

455

464

505

658

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

506

510

514

515

516

19

20

21

22

23

24

27

28

31

32

60

89

98

104

110

114

259

288

337

385


What can be the reason that fetcher thread taking more time to process?

What we need to do to get better performance? Any properties we need to
tweak?

Any suggestion are welcome.


Note: We are pushing data to Kafka in user thread(tomcat) and set producer
request timeout to 2sec. We don't want to increase timeout more than 2
sec., as if it too many threads will get hangup then application will be
hanged.


Thanks and Regards,
Madhukar


Re: Error handling in New AsyncProducer

2015-08-17 Thread Madhukar Bharti
Thanks for you explanation Jeff !

On Mon, Aug 17, 2015 at 11:23 PM, Jeff Holoman 
wrote:

> I should've been more specific...if the producer loses total access to all
> brokers...eg. some kind of network issue.
>
> On Mon, Aug 17, 2015 at 1:50 PM, Jeff Holoman 
> wrote:
>
> > Actually this won't work. The problem is if the producer loses
> > connectivity to the broker, then messages will continue to queue up until
> > batch.size is exhausted. Then the send will block.  At this point, if you
> > gain connectivity again, then the messages will be resent.
> >
> > If all brokers die, you should get Not Enough Replicas if you have the
> > min.isr.set first. This should be an indication that you are having a
> > problem, before you get to the error in the actual prodcuer thread. Keep
> in
> > mind that as of right now you need to manually set the min.isr's with
> alter
> > topic due to KAFKA-2114
> > Either way, you will eventually get an error in the producer thread that
> > isn't passed to the callback.
> >
> > And I think that's the point.
> >
> > Thanks
> >
> > Jeff
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 17, 2015 at 12:39 PM, Madhukar Bharti <
> > bhartimadhu...@gmail.com> wrote:
> >
> >> Hi Sunil,
> >>
> >> Producer will throw an Exception in callback if there is problem while
> >> sending data. You can check like:
> >>
> >> public void onCompletion(RecordMetadata arg0, Exception arg1) {
> >> if (arg1 != null) {
> >> System.out.println("exception occured");
> >> }
> >> System.out.println("sent")
> >>
> >>
> >>
> >>
> >> On Mon, Aug 17, 2015 at 9:20 PM, sunil kalva 
> >> wrote:
> >>
> >> > Hi all
> >> > I am using new java producer in async mode, when my entire cluster is
> >> down
> >> > i am loosing all my messages. How do we get notification when the
> >> cluster
> >> > is down so that i can send messages to another cluster. The callback
> is
> >> > only triggered when the cluster is reachable .
> >> >
> >> > --SK
> >> >
> >>
> >>
> >>
> >> --
> >> Thanks and Regards,
> >> Madhukar Bharti
> >> Mob: 7845755539
> >>
> >
> >
> >
> > --
> > Jeff Holoman
> > Systems Engineer
> >
> >
> >
> >
>
>
> --
> Jeff Holoman
> Systems Engineer
>


Re: Error handling in New AsyncProducer

2015-08-17 Thread Madhukar Bharti
Hi Sunil,

Producer will throw an Exception in callback if there is problem while
sending data. You can check like:

public void onCompletion(RecordMetadata arg0, Exception arg1) {
if (arg1 != null) {
System.out.println("exception occured");
}
System.out.println("sent")




On Mon, Aug 17, 2015 at 9:20 PM, sunil kalva  wrote:

> Hi all
> I am using new java producer in async mode, when my entire cluster is down
> i am loosing all my messages. How do we get notification when the cluster
> is down so that i can send messages to another cluster. The callback is
> only triggered when the cluster is reachable .
>
> --SK
>



-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


New Producer keep on trying to connect even after metadata timeout occured

2015-05-19 Thread Madhukar Bharti
Hi,

I am testing Kafka-0.8.2.1 new producer API.  For synchronous sending, I am
calling future.get() just after producer send.

I killed my broker and started Produce, noticed that it is throwing
ExecutionException but after that It is still trying to re-connect to
broker and this is keep on going.

How we can stop it? I don't want to keep on trying if the broker is not
available. In Kafka 0.8.1.1 , after throwing exception It was not trying to
re-connect. Is there anything to handle this?

I have 1 broker and Zookeeper running on the same. Set
metadata.fetch.timeout.ms to 100.

Regards,
Madhukar


Doubt regarding new Producer and old Producer API

2015-05-13 Thread Madhukar Bharti
Hi all,

What are the possible use cases for using new producer API?
- Is this only provides async call with callback feature?
- Is partitioner class has been removed from new Producer API? if not then
how to implement it if I want to use only client APIs?



Regards,
Madhukar


Re: Why fetching meta-data for topic is done three times?

2015-05-01 Thread Madhukar Bharti
Hi Zakee,

Yes even if retries is set to 1 it takes 3 sec to throw an exception. I am
not sure in producer code whether they intentionally swallowing all
exception while doing send which internally does 3 tasks. If 1st itself get
failed whether to proceed for next is required?

Now for throwing exception in 1 sec we have set  request.timeout.ms=300ms.
I am not sure whether putting less value for this will work always. If
there is some disturbance in n/w ping itself take about 600 ms.

What is recommended value should we use for this?

Thanks
Madhukar

On Fri, May 1, 2015 at 3:52 AM, Zakee  wrote:

> With reties 1 you still see the 3 secs delay? The idea is, you can change
> these property to reduce the time to throw exception to 1 secs or below.
> Does that help?
>
> Thanks
> Zakee
>
>
>
> > On Apr 28, 2015, at 10:29 PM, Madhukar Bharti 
> wrote:
> >
> > Hi Zakee,
> >
> >> message.send.max.retries is 1
> >
> > Regards,
> > Madhukar
> >
> > On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti <
> bhartimadhu...@gmail.com <mailto:bhartimadhu...@gmail.com>>
> > wrote:
> >
> >> Hi Zakee,
> >>
> >> Thanks for your reply.
> >>
> >>> message.send.max.retries
> >> 3
> >>
> >>> retry.backoff.ms
> >> 100
> >>
> >>> topic.metadata.refresh.interval.ms
> >> 600*1000
> >>
> >> This is my properties.
> >>
> >> Regards,
> >> Madhukar
> >>
> >> On Tue, Apr 28, 2015 at 3:26 AM, Zakee  wrote:
> >>
> >>> What values do you have for below properties? Or are these set to
> >>> defaults?
> >>>
> >>> message.send.max.retries
> >>> retry.backoff.ms
> >>> topic.metadata.refresh.interval.ms
> >>>
> >>> Thanks
> >>> Zakee
> >>>
> >>>
> >>>
> >>>> On Apr 23, 2015, at 11:48 PM, Madhukar Bharti <
> bhartimadhu...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Hi All,
> >>>>
> >>>> Once gone through code found that, While Producer starts it does three
> >>>> things:
> >>>>
> >>>> 1. Sends Meta-data request
> >>>> 2. Send message to broker(fetching broker list)
> >>>> 3. If number of message to be produce is grater than 0 then again
> tries
> >>> to
> >>>> refresh metadata for outstanding produce requests.
> >>>>
> >>>> Each of the request takes configured timeout and go to next and
> finally
> >>>> once all is done then it will throw Exception(if 3 also fails).
> >>>>
> >>>> Here the problem is, if we set timeout as 1 sec then to throw an
> >>> exception
> >>>> It takes 3 sec, so user request will be hanged up till 3 sec, that is
> >>>> comparatively high for response time and if all threads will be
> blocked
> >>> due
> >>>> to producer send then whole application will be blocked for 3 sec. So
> we
> >>>> want to reduce this time to 1 sec. in overall to throw Exception.
> >>>>
> >>>> What is the possible way to do this?
> >>>>
> >>>> Thanks
> >>>> Madhukar
> >>>>
> >>>> On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti <
> >>> bhartimadhu...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi All,
> >>>>>
> >>>>> I came across a problem, If we use broker IP which is not reachable
> or
> >>> out
> >>>>> of network. Then it takes more than configured time(
> request.timeout.ms
> >>> ).
> >>>>> After checking the log got to know that it is trying to fetch topic
> >>>>> meta-data three times by changing correlation id.
> >>>>> Due to this even though i keep (request.timeout.ms=1000) It takes 3
> >>> sec
> >>>>> to throw Exception. I am using Kafka0.8.1.1 with patch
> >>>>>
> >>>
> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
> >>>>>
> >>>>>
> >>>>> I have attached the log. Please check this and clarify why it is
> >>> behaving
> >>>>> like this. Whether it is by design or have to set some other property
> >>> also.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Regards
> >>>>> Madhukar
> >>>>>
> >>>>>
> >>>>>
> >>>> 
> >>>> Want to place your ad here?
> >>>> Advertise on United Online
> >>>>
> >>>
> http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
> >>>
> >>>
> >>
> >>
> > 
> > The WORST exercise for aging
> > Avoid this "healthy" exercise to look & feel 5-10 years YOUNGER
> > http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc
> <http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc>


Re: Why fetching meta-data for topic is done three times?

2015-04-29 Thread Madhukar Bharti
Hi Zakee,

>message.send.max.retries is 1

Regards,
Madhukar

On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti 
wrote:

> Hi Zakee,
>
> Thanks for your reply.
>
> >message.send.max.retries
> 3
>
> >retry.backoff.ms
> 100
>
> >topic.metadata.refresh.interval.ms
> 600*1000
>
> This is my properties.
>
> Regards,
> Madhukar
>
> On Tue, Apr 28, 2015 at 3:26 AM, Zakee  wrote:
>
>> What values do you have for below properties? Or are these set to
>> defaults?
>>
>> message.send.max.retries
>> retry.backoff.ms
>> topic.metadata.refresh.interval.ms
>>
>> Thanks
>> Zakee
>>
>>
>>
>> > On Apr 23, 2015, at 11:48 PM, Madhukar Bharti 
>> wrote:
>> >
>> > Hi All,
>> >
>> > Once gone through code found that, While Producer starts it does three
>> > things:
>> >
>> > 1. Sends Meta-data request
>> > 2. Send message to broker(fetching broker list)
>> > 3. If number of message to be produce is grater than 0 then again tries
>> to
>> > refresh metadata for outstanding produce requests.
>> >
>> > Each of the request takes configured timeout and go to next and finally
>> > once all is done then it will throw Exception(if 3 also fails).
>> >
>> > Here the problem is, if we set timeout as 1 sec then to throw an
>> exception
>> > It takes 3 sec, so user request will be hanged up till 3 sec, that is
>> > comparatively high for response time and if all threads will be blocked
>> due
>> > to producer send then whole application will be blocked for 3 sec. So we
>> > want to reduce this time to 1 sec. in overall to throw Exception.
>> >
>> > What is the possible way to do this?
>> >
>> > Thanks
>> > Madhukar
>> >
>> > On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti <
>> bhartimadhu...@gmail.com>
>> > wrote:
>> >
>> >> Hi All,
>> >>
>> >> I came across a problem, If we use broker IP which is not reachable or
>> out
>> >> of network. Then it takes more than configured time(request.timeout.ms
>> ).
>> >> After checking the log got to know that it is trying to fetch topic
>> >> meta-data three times by changing correlation id.
>> >> Due to this even though i keep (request.timeout.ms=1000) It takes 3
>> sec
>> >> to throw Exception. I am using Kafka0.8.1.1 with patch
>> >>
>> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
>> >>
>> >>
>> >> I have attached the log. Please check this and clarify why it is
>> behaving
>> >> like this. Whether it is by design or have to set some other property
>> also.
>> >>
>> >>
>> >>
>> >> Regards
>> >> Madhukar
>> >>
>> >>
>> >>
>> > 
>> > Want to place your ad here?
>> > Advertise on United Online
>> >
>> http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
>>
>>
>
>


Re: Why fetching meta-data for topic is done three times?

2015-04-28 Thread Madhukar Bharti
Hi Zakee,

Thanks for your reply.

>message.send.max.retries
3

>retry.backoff.ms
100

>topic.metadata.refresh.interval.ms
600*1000

This is my properties.

Regards,
Madhukar

On Tue, Apr 28, 2015 at 3:26 AM, Zakee  wrote:

> What values do you have for below properties? Or are these set to defaults?
>
> message.send.max.retries
> retry.backoff.ms
> topic.metadata.refresh.interval.ms
>
> Thanks
> Zakee
>
>
>
> > On Apr 23, 2015, at 11:48 PM, Madhukar Bharti 
> wrote:
> >
> > Hi All,
> >
> > Once gone through code found that, While Producer starts it does three
> > things:
> >
> > 1. Sends Meta-data request
> > 2. Send message to broker(fetching broker list)
> > 3. If number of message to be produce is grater than 0 then again tries
> to
> > refresh metadata for outstanding produce requests.
> >
> > Each of the request takes configured timeout and go to next and finally
> > once all is done then it will throw Exception(if 3 also fails).
> >
> > Here the problem is, if we set timeout as 1 sec then to throw an
> exception
> > It takes 3 sec, so user request will be hanged up till 3 sec, that is
> > comparatively high for response time and if all threads will be blocked
> due
> > to producer send then whole application will be blocked for 3 sec. So we
> > want to reduce this time to 1 sec. in overall to throw Exception.
> >
> > What is the possible way to do this?
> >
> > Thanks
> > Madhukar
> >
> > On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti <
> bhartimadhu...@gmail.com>
> > wrote:
> >
> >> Hi All,
> >>
> >> I came across a problem, If we use broker IP which is not reachable or
> out
> >> of network. Then it takes more than configured time(request.timeout.ms
> ).
> >> After checking the log got to know that it is trying to fetch topic
> >> meta-data three times by changing correlation id.
> >> Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec
> >> to throw Exception. I am using Kafka0.8.1.1 with patch
> >>
> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
> >>
> >>
> >> I have attached the log. Please check this and clarify why it is
> behaving
> >> like this. Whether it is by design or have to set some other property
> also.
> >>
> >>
> >>
> >> Regards
> >> Madhukar
> >>
> >>
> >>
> > 
> > Want to place your ad here?
> > Advertise on United Online
> > http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
>
>


Re: Why fetching meta-data for topic is done three times?

2015-04-23 Thread Madhukar Bharti
Hi All,

Once gone through code found that, While Producer starts it does three
things:

1. Sends Meta-data request
2. Send message to broker(fetching broker list)
3. If number of message to be produce is grater than 0 then again tries to
refresh metadata for outstanding produce requests.

Each of the request takes configured timeout and go to next and finally
once all is done then it will throw Exception(if 3 also fails).

Here the problem is, if we set timeout as 1 sec then to throw an exception
It takes 3 sec, so user request will be hanged up till 3 sec, that is
comparatively high for response time and if all threads will be blocked due
to producer send then whole application will be blocked for 3 sec. So we
want to reduce this time to 1 sec. in overall to throw Exception.

What is the possible way to do this?

Thanks
Madhukar

On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti 
wrote:

> Hi All,
>
> I came across a problem, If we use broker IP which is not reachable or out
> of network. Then it takes more than configured time(request.timeout.ms).
> After checking the log got to know that it is trying to fetch topic
> meta-data three times by changing correlation id.
> Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec
> to throw Exception. I am using Kafka0.8.1.1 with patch
> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
>
>
> I have attached the log. Please check this and clarify why it is behaving
> like this. Whether it is by design or have to set some other property also.
>
>
>
> Regards
> Madhukar
>
>
>


Re: Issue with high level consumer in 8.1.1 after restart

2015-04-22 Thread Madhukar Bharti
Hi All,

Any update on this? We are facing same issue all the time when ever we
re-start consumers.

On Mon, Apr 6, 2015 at 11:19 PM, Madhukar Bharti 
wrote:

> Hi Mayuresh,
>
> We are having only one consumer in the group and only one partition for
> that topic.
> We have set auto.commit.enable false, zookeeper.session.timeout.ms=6,
> rebalance.backoff.ms to 2000 and rebalance.max.retries to 20.
>
> Thanks!
>
> On Mon, Apr 6, 2015 at 9:59 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> It actually depends on how many consumers you have in the same group and
>> how many partitions the particular topic has.
>>
>> Can you elaborate on your configuration?
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Mon, Apr 6, 2015 at 3:35 AM, Madhukar Bharti > >
>> wrote:
>>
>> > Hi All,
>> >
>> > We are facing issue with Kafka high Level consumers. We are using
>> 0.8.1.1
>> > version. Sometimes after restart consumers picks the messages sometimes
>> it
>> > starts but not receives any messages. Is high level consumer is not
>> > reliable?
>> >
>> > I have checked with the log. Even if re-balance succeed, consumer didn't
>> > receives any messages. Is there any way to overcome this.
>> >
>> > I am attaching log also when the consumer restarted. Please find the
>> > attachment and let us know what can be possible reason. We have only one
>> > partition for that topic.
>> >
>> > Thanks in Advance!
>> >
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>
>


Why fetching meta-data for topic is done three times?

2015-04-16 Thread Madhukar Bharti
Hi All,

I came across a problem, If we use broker IP which is not reachable or out
of network. Then it takes more than configured time(request.timeout.ms).
After checking the log got to know that it is trying to fetch topic
meta-data three times by changing correlation id.
Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec to
throw Exception. I am using Kafka0.8.1.1 with patch
https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch


I have attached the log. Please check this and clarify why it is behaving
like this. Whether it is by design or have to set some other property also.



Regards
Madhukar
[2015-04-16 19:43:42,712] INFO Verifying properties 
(kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property message.send.max.retries is overridden 
to 0 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property metadata.broker.list is overridden to 
178.95.20.30:9092 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property request.required.acks is overridden to 
-1 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,741] INFO Property request.timeout.ms is overridden to 
1000 (kafka.utils.VerifiableProperties)
[2015-04-16 19:43:42,742] INFO Property serializer.class is overridden to 
kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
Sending... KeyedMessage(Test,1,1,Date: 2015/04/16 19:43:42, Message No1)
[2015-04-16 19:43:42,804] DEBUG Handling 1 events 
(kafka.producer.async.DefaultEventHandler)
[2015-04-16 19:43:42,843] TRACE Instantiating Scala Sync Producer with 
properties: {metadata.broker.list=178.95.20.30:9092, request.required.acks=-1, 
request.timeout.ms=1000, port=9092, message.send.max.retries=0, 
serializer.class=kafka.serializer.StringEncoder, host=178.95.20.30} 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:42,844] INFO Fetching metadata from broker 
id:0,host:178.95.20.30,port:9092 with correlation id 0 for 1 topic(s) Set(Test) 
(kafka.client.ClientUtils$)
[2015-04-16 19:43:42,849] TRACE verifying sendbuffer of size 20 
(kafka.producer.SyncProducer)
[2015-04-16 19:43:42,850] TRACE START BlockingChannel#connect 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:42,851] TRACE >STATE: Not connected.. 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:42,856] TRACE >STATE Now connecting 
(kafka.network.BlockingChannel)
[2015-04-16 19:43:43,870] ERROR Producer connection to 178.95.20.30:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.kafka.producer.KafkaProducer.main(KafkaProducer.java:41)
[2015-04-16 19:43:43,878] WARN Fetching topic metadata with correlation id 0 
for topics [Set(Test)] from broker [id:0,host:178.95.20.30,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:124)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:61)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:167)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scal

Re: Consumer offsets in offsets topic 0.8.2

2015-04-13 Thread Madhukar Bharti
Hi Vamsi,

You can also see the example here

if
you want to use Java API to get the offset from topic.

Regards,
Madhukar

On Mon, Apr 13, 2015 at 10:25 PM, 4mayank <4may...@gmail.com> wrote:

> I did a similar change - moved from High Level Consumer to Simple Consumer.
> Howerver kafka-consumer-offset-checker.sh throws an exception. Its
> searching the zk path /consumers// which does not exist on any of my
> zk nodes.
>
> Is there any other tool for getting the offset lag when using Simple
> Consumer? Or am I using kafka-consumer-offset-checker.sh incorrectly for
> Simple Consumer?
>
> Output:
> kafka-consumer-offset-checker.sh --zookeeper 192.168.1.201:2181,
> 192.168.1.202:2181,192.168.1.203:2181 --group my-control-group
> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for /consumers/my-control-group/owners.
>
>
> kafka-consumer-offset-checker.sh --zookeeper 192.168.1.201:2181,
> 192.168.1.202:2181,192.168.1.203:2181 --group my-control-group --topic
> my-control
> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/my-control-group/offsets/my-control/1.
>
>
> ZK cli output:
>
> ./zookeeper-shell.sh 192.168.1.201:2181
> Connecting to 192.168.1.201:2181
> Welcome to ZooKeeper!
> JLine support is disabled
>
> WATCHER::
>
> WatchedEvent state:SyncConnected type:None path:null
> ls /config/topics
> [my-control]
> ls /consumers
> []
> quit
>
>
> Thanks.
> Mayank.
>
> On Fri, Mar 20, 2015 at 9:54 AM, Jiangjie Qin 
> wrote:
>
> > Hi Vamsi,
> >
> > The ConsumerOffsetChecker.scala or kafka-consumer-offset-checker.sh still
> > works. You can use them to check the offsets.
> > If you need to check the offsets programmatically, you can send
> > OffsetsFetcheRequest to broker using simple consumer. You may refer the
> > ConsumerOffsetChecker.scala to see how to find correct broker where
> > corresponding offset manager resides.
> >
> > Jiangjie (Becket) Qin
> >
> > On 3/19/15, 11:54 PM, "Achanta Vamsi Subhash" <
> achanta.va...@flipkart.com>
> > wrote:
> >
> > >Hi,
> > >
> > >We are using 0.8.2.1 currently.
> > >
> > >- How to get the consumer offsets from the offsets topic?
> > >- Is there any built-in function which I could use? (like in
> > >AdminUtils.scala)
> > >- Is it ok to start a simple consumer and read the offsets from the
> topic?
> > >
> > >We used to read the offsets from zookeeper previously for the
> > >HighLevelConsumers. But with the new broker this changed as we are using
> > >Kafka topic for offsets.
> > >
> > >--
> > >Regards
> > >Vamsi Subhash
> >
> >
>


Re: Producer does not recognize new brokers

2015-04-12 Thread Madhukar Bharti
Hi,

Simply adding the brokers to the cluster will not reassign or redistribute
topic partitions to newly added brokers.

As it is also mentioned in documentation.
"*However these new servers will not automatically be assigned any data
partitions, so unless partitions are moved to them they won't be doing any
work until new topics are created. So usually when you add machines to your
cluster you will want to migrate some existing data to these machines.*"
Please refer this link

and the needful.

Regards,
Madhukar


On Mon, Apr 13, 2015 at 11:22 AM, shadyxu  wrote:

> I added several new brokers to the cluster, there should'v been a rebalance
> but it seemed that the producer was not aware of the new brokers. Data kept
> being sent to the old brokers and there were no partitions on the new
> brokers.
>
> I configured the old brokers to the producer and did not restart the
> producer or add the new brokers to the configuration.
>
> What may be the problems?
>


Re: Issue with high level consumer in 8.1.1 after restart

2015-04-06 Thread Madhukar Bharti
Hi Mayuresh,

We are having only one consumer in the group and only one partition for
that topic.
We have set auto.commit.enable false, zookeeper.session.timeout.ms=6,
rebalance.backoff.ms to 2000 and rebalance.max.retries to 20.

Thanks!

On Mon, Apr 6, 2015 at 9:59 PM, Mayuresh Gharat 
wrote:

> It actually depends on how many consumers you have in the same group and
> how many partitions the particular topic has.
>
> Can you elaborate on your configuration?
>
> Thanks,
>
> Mayuresh
>
> On Mon, Apr 6, 2015 at 3:35 AM, Madhukar Bharti 
> wrote:
>
> > Hi All,
> >
> > We are facing issue with Kafka high Level consumers. We are using 0.8.1.1
> > version. Sometimes after restart consumers picks the messages sometimes
> it
> > starts but not receives any messages. Is high level consumer is not
> > reliable?
> >
> > I have checked with the log. Even if re-balance succeed, consumer didn't
> > receives any messages. Is there any way to overcome this.
> >
> > I am attaching log also when the consumer restarted. Please find the
> > attachment and let us know what can be possible reason. We have only one
> > partition for that topic.
> >
> > Thanks in Advance!
> >
>
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125


Issue with high level consumer in 8.1.1 after restart

2015-04-06 Thread Madhukar Bharti
Hi All,

We are facing issue with Kafka high Level consumers. We are using 0.8.1.1
version. Sometimes after restart consumers picks the messages sometimes it
starts but not receives any messages. Is high level consumer is not
reliable?

I have checked with the log. Even if re-balance succeed, consumer didn't
receives any messages. Is there any way to overcome this.

I am attaching log also when the consumer restarted. Please find the
attachment and let us know what can be possible reason. We have only one
partition for that topic.

Thanks in Advance!


Re: delete.retention.ms in 0.8.1

2015-04-03 Thread Madhukar Bharti
Hi Gaurav,

What is your "log.retention.check.interval.ms" ? There might be a chance it
will be high so it is not able  to delete in specified interval.

And also in Kafka 0.8.1 it will be "retention.ms". Please check this


Regards,
Madhukar

On Fri, Apr 3, 2015 at 5:01 PM, Gaurav Agarwal 
wrote:

> hello group,
> I have created a topic with the delete retention ms time 1000 and send
> and consume message across. Nothing happened after that , i checked
> the log also , message also not deleted as well. Please help me to
> come to know what is the need
>


Re: Async producer using Sync producer for send

2015-03-30 Thread Madhukar Bharti
Thanks Jiangjie,

I too have thought the same after looking the code. Thanks a lot for
clearing my doubt!

On Tue, Mar 31, 2015 at 11:45 AM, Jiangjie Qin 
wrote:

> The async send() put the message into a message queue then returns. When
> the messages are pulled out of the queue by the sender thread, it still
> uses SyncProducer to send ProducerRequests to brokers.
>
> Jiangjie (Becket) Qin
>
> On 3/30/15, 10:44 PM, "Madhukar Bharti"  wrote:
>
> >Hi All,
> >
> >I am using *async *producer to send the data. When I checked the log it is
> >showing as below:
> >
> >
> >[2015-03-31 11:09:55,915] INFO Verifying properties
> >(kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,946] INFO Property key.serializer.class is overridden
> >to kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,947] INFO Property metadata.broker.list is overridden
> >to 172.20.6.201:9092,172.20.6.25:9092,172.20.8.62:9092
> >(kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,947] INFO Property producer.type is overridden to
> >async (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,947] INFO Property queue.buffering.max.ms is
> >overridden to 300 (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,947] INFO Property queue.enqueue.timeout.ms is
> >overridden to 50 (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,947] INFO Property request.required.acks is
> >overridden
> >to 1 (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,948] INFO Property send.buffer.bytes is overridden to
> >4048 (kafka.utils.VerifiableProperties)
> >[2015-03-31 11:09:55,948] INFO Property serializer.class is overridden to
> >kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
> >Thread Number0
> >Sent: This is message 0, Topic::TestMQ
> >Sent: This is message 1, Topic::TestMQ
> >Sent: This is message 2, Topic::TestMQ
> >Sent: This is message 3, Topic::TestMQ
> >Sent: This is message 4, Topic::TestMQ
> >Sent: This is message 5, Topic::TestMQ
> >Sent: This is message 6, Topic::TestMQ
> >Sent: This is message 7, Topic::TestMQ
> >Sent: This is message 8, Topic::TestMQ
> >Sent: This is message 9, Topic::TestMQ
> >[2015-03-31 11:09:56,395] INFO Fetching metadata from broker
> >id:1,host:172.20.6.25,port:9092 with correlation id 0 for 1 topic(s)
> >Set(TestMQ) (kafka.client.ClientUtils$)
> >[2015-03-31 11:09:56,404] INFO Connected to 172.20.6.25:9092 for
> producing
> >(kafka.producer.SyncProducer)
> >[2015-03-31 11:09:56,438] INFO Disconnecting from 172.20.6.25:9092
> >(kafka.producer.SyncProducer)
> >[2015-03-31 11:09:56,479] INFO Connected to 172.20.6.25:9092 for
> producing
> >(kafka.producer.SyncProducer)
> >[2015-03-31 11:09:56,573] INFO Connected to 172.20.6.201:9092 for
> >producing
> >(kafka.producer.SyncProducer)
> >[2015-03-31 11:09:56,591] INFO Connected to 172.20.8.62:9092 for
> producing
> >(kafka.producer.SyncProducer)
> >
> >
> >
> >My doubt is why it is using "*kafka.producer.SyncProducer*"? I am using
> >kafka 0.8.1.1.
> >
> >
> >Thanks in advance!
>
>


Async producer using Sync producer for send

2015-03-30 Thread Madhukar Bharti
Hi All,

I am using *async *producer to send the data. When I checked the log it is
showing as below:


[2015-03-31 11:09:55,915] INFO Verifying properties
(kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,946] INFO Property key.serializer.class is overridden
to kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,947] INFO Property metadata.broker.list is overridden
to 172.20.6.201:9092,172.20.6.25:9092,172.20.8.62:9092
(kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,947] INFO Property producer.type is overridden to
async (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,947] INFO Property queue.buffering.max.ms is
overridden to 300 (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,947] INFO Property queue.enqueue.timeout.ms is
overridden to 50 (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,947] INFO Property request.required.acks is overridden
to 1 (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,948] INFO Property send.buffer.bytes is overridden to
4048 (kafka.utils.VerifiableProperties)
[2015-03-31 11:09:55,948] INFO Property serializer.class is overridden to
kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
Thread Number0
Sent: This is message 0, Topic::TestMQ
Sent: This is message 1, Topic::TestMQ
Sent: This is message 2, Topic::TestMQ
Sent: This is message 3, Topic::TestMQ
Sent: This is message 4, Topic::TestMQ
Sent: This is message 5, Topic::TestMQ
Sent: This is message 6, Topic::TestMQ
Sent: This is message 7, Topic::TestMQ
Sent: This is message 8, Topic::TestMQ
Sent: This is message 9, Topic::TestMQ
[2015-03-31 11:09:56,395] INFO Fetching metadata from broker
id:1,host:172.20.6.25,port:9092 with correlation id 0 for 1 topic(s)
Set(TestMQ) (kafka.client.ClientUtils$)
[2015-03-31 11:09:56,404] INFO Connected to 172.20.6.25:9092 for producing
(kafka.producer.SyncProducer)
[2015-03-31 11:09:56,438] INFO Disconnecting from 172.20.6.25:9092
(kafka.producer.SyncProducer)
[2015-03-31 11:09:56,479] INFO Connected to 172.20.6.25:9092 for producing
(kafka.producer.SyncProducer)
[2015-03-31 11:09:56,573] INFO Connected to 172.20.6.201:9092 for producing
(kafka.producer.SyncProducer)
[2015-03-31 11:09:56,591] INFO Connected to 172.20.8.62:9092 for producing
(kafka.producer.SyncProducer)



My doubt is why it is using "*kafka.producer.SyncProducer*"? I am using
kafka 0.8.1.1.


Thanks in advance!


How to fetch consumer group names of a Topic from Kafka offset manager in Kafka 0.8.2.1

2015-03-13 Thread Madhukar Bharti
Hi,

I am using Kafka 0.8.2.1. I have two topics with 10 partitions each.
Noticed that one more topic exist named as "__consumer_offset" with 50
partitions.  My questions are:

1. Why this topic is created with 50 partition?
2. How to get consumer group names for a topic? Is there any document or
API to get all consumer group from Kafka offset storage manager like
zookeeper we have /consumer which lists all consumers.


Thanks in advance.

Regards,
Madhukar


Re: How to setup inter DC replication in Kafka 0.8.1.1

2015-01-21 Thread Madhukar Bharti
Thanks Guozhang for your reply.

Checked the details as mentioned above. Issue KAFKA-1650 has been fixed but
it will come with 0.8.3. :-(

By "MM Killed", it is hard-failed(kill -9 )/crashed due to system reboot or
any other reason. In that case some of the messages get duplicated.

Can you please suggest a way to do inter DC replication for time being
without loss/duplication of messages?



On Thu, Jan 22, 2015 at 6:39 AM, Guozhang Wang  wrote:

> Hi Madhukar,
>
> We are currently working on improving MM to avoid data loss / duplicates,
> details can be found in KAFKA-1650 / 1839 / 1840.
>
> For your concern, by saying "MM killed" do you mean it is shutdown or
> hard-failed / crashed that can cause duplicates?
>
> Guozhang
>
> On Mon, Jan 19, 2015 at 10:51 PM, Madhukar Bharti <
> bhartimadhu...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I want to setup inter DC replication b/w Kafka clusters. Is there any
> > inbuilt tool to do this?
> >
> > I already have tried MirrorMaker tool but the problem is, if MM killed
> then
> > some messages get duplicated. I don't want to duplicate the messages.
> >
> > Please suggest a way to do this. Please share your experience how you
> have
> > setup this.
> >
> >
> > Thanks in Advance,
> > Madhukar
> >
>
>
>
> --
> -- Guozhang
>



-- 
Thanks and Regards,
Madhukar Bharti


How to setup inter DC replication in Kafka 0.8.1.1

2015-01-19 Thread Madhukar Bharti
Hi,

I want to setup inter DC replication b/w Kafka clusters. Is there any
inbuilt tool to do this?

I already have tried MirrorMaker tool but the problem is, if MM killed then
some messages get duplicated. I don't want to duplicate the messages.

Please suggest a way to do this. Please share your experience how you have
setup this.


Thanks in Advance,
Madhukar


Re: How To Specify partioner.class for Mirrormaker?

2014-12-15 Thread Madhukar Bharti
Dear Alex Melville,

You can put the you partitioner class in an jar file inside kafka/libs.

Now if you will start Mirror maker it will load the jar and you will not
get any exception.

On Sun, Dec 14, 2014 at 9:41 AM, Alex Melville  wrote:
>
> Sorry, the email sent before I typed my question. Please read below:
>
>
> I wrote my own Java Consumer and Producer based off of the Kafka Producer
> API and High Level Consumer. Let's call them MyConsumer and MyProducer.
> MyProducer uses a custom Partitioner class called SimplePartitioner. In the
> producer.config file that I specify when I run the MirrorMaker from the
> command line, there is a parameter "partitioner.class". I keep getting
> "ClassDefNotFoundException exceptions, no matter if I put the absolute path
> to my SimplePartitioner.class file, a relative path, or even when I add
> SimplePartitioner.class to the $CLASSPATH variables created in the
> kafka-run-class.sh script. Here is my output error:
>
> Exception in thread "main" java.lang.ClassNotFoundException:
> SimplePartitioner.class
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:191)
> at kafka.utils.Utils$.createObject(Utils.scala:438)
> at kafka.producer.Producer.(Producer.scala:60)
> at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
> at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at scala.collection.immutable.Range.foreach(Range.scala:81)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.immutable.Range.map(Range.scala:46)
> at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
> at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
> What is the correct value for the "partitioner.class" parameter in my
> producer.properties config file?
>
> On Sat, Dec 13, 2014 at 8:11 PM, Alex Melville 
> wrote:
> >
> > Dear Kafka Devs,
> >
> >
> >
>


-- 
Regards,
Madhukar


Re: How to Setup MirrorMaker in Generalized way

2014-12-11 Thread Madhukar Bharti
Hi Neha,

Thanks for your reply.

Now using MM tool to replicate data between Kafka clusters, But I am facing
one problem, Messages gets duplicated if MM killed forcefully[ *kill -9* ].

Is there any solution to avoid this duplicated entry in target cluster? I
am using Kafka


*8.1.1.*

On Mon, Dec 8, 2014 at 11:17 PM, Neha Narkhede  wrote:

> Hi Madhukar,
>
> From the same documentation link you referred to -
>
> The source and destination clusters are completely independent entities:
> > they can have different numbers of partitions and the offsets will not be
> > the same. For this reason the mirror cluster is not really intended as a
> > fault-tolerance mechanism (as the consumer position will be different);
> for
> > that we recommend using normal in-cluster replication. The mirror maker
> > process will, however, retain and use the message key for partitioning so
> > order is preserved on a per-key basis.
>
>
> There is no way to setup an *exact* Kafka mirror yet.
>
> Thanks,
> Neha
>
> On Mon, Dec 8, 2014 at 7:47 AM, Madhukar Bharti 
> wrote:
>
> > Hi,
> >
> > I am going to setup Kafka clusters having 3 brokers in Datacenter 1.
> Topics
> > can be created time to time. Each topic can have varying partitions
> mostly
> > 1,10 or 20. Each application might have different partitioning algorithm
> > that we don't know(let it be hidden from ops team).
> >
> > We want to setup mirror maker tool in such a way so that, the exact
> > partitioned data should go to the same partition without knowing the
> Topics
> > partition logic and it should be *generalized*. [This should be common
> for
> > all Topics.]
> >
> > *like partition 0 at DataCenter1 should be exact mirror of  partition-0
> in
> > Datacenter2*.
> >
> > Please suggest me a solution for doing so. If MirrorMaker
> > <http://kafka.apache.org/documentation.html#basic_ops_mirror_maker> tool
> > provide any configurations which solve this use-case please let me know.
> >
> >
> >
> > Regards,
> > Madhukar Bharti
> >
>
>
>
> --
> Thanks,
> Neha
>


--
Thanks and Regards,
Madhukar Bharti


How to Setup MirrorMaker in Generalized way

2014-12-08 Thread Madhukar Bharti
Hi,

I am going to setup Kafka clusters having 3 brokers in Datacenter 1. Topics
can be created time to time. Each topic can have varying partitions mostly
1,10 or 20. Each application might have different partitioning algorithm
that we don't know(let it be hidden from ops team).

We want to setup mirror maker tool in such a way so that, the exact
partitioned data should go to the same partition without knowing the Topics
partition logic and it should be *generalized*. [This should be common for
all Topics.]

*like partition 0 at DataCenter1 should be exact mirror of  partition-0 in
Datacenter2*.

Please suggest me a solution for doing so. If MirrorMaker
<http://kafka.apache.org/documentation.html#basic_ops_mirror_maker> tool
provide any configurations which solve this use-case please let me know.



Regards,
Madhukar Bharti


Re: Getting Simple consumer details using MBean

2014-11-16 Thread Madhukar Bharti
Hi,

Thank you for your reply Otis. So simple consumers can be fetched from
8.0.2? Is There no other way to get it from 8.0.1?

@Jun:  The link you have shared doesn't have any beans to get simple
consumer details.

Kindly help me in this.


Thanks and Regards
Madhukar

On Mon, Nov 17, 2014 at 12:20 AM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi Madhukar,
>
> Maybe you want to look at SPM <http://sematext.com/spm/>, which has Kafka
> monitoring.
> But please note this:
>
> https://sematext.atlassian.net/wiki/display/PUBSPM/SPM+FAQ#SPMFAQ-WhyamInotseeingallKafkametricsifI'mrunninga0.8.xversionofKafkathatispre-0.8.2
> ?
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Fri, Nov 14, 2014 at 5:52 PM, Jun Rao  wrote:
>
> > So, you want to monitor the mbeans on the broker side? Take a look at
> > http://kafka.apache.org/documentation.html#monitoring
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Nov 13, 2014 at 10:58 PM, Madhukar Bharti <
> > bhartimadhu...@gmail.com>
> > wrote:
> >
> > > Hi Jun Rao,
> > >
> > > Sorry to disturb you. But I my Kafka setup it is not showing. I am
> > > attaching screen shot taken from all brokers.
> > >
> > > In kafka.consumer it is listing only "ReplicaFetcherThread".
> > >
> > > As I said earlier I am using "2.10-0.8.1.1" version. Do i need to
> > > configure any extra parameter for this? I am simply using the same
> > > configuration as described in wiki page.
> > >
> > >
> > >
> > > Thanks and Regards,
> > > Madhukar
> > >
> > >
> > > On Fri, Nov 14, 2014 at 1:17 AM, Jun Rao  wrote:
> > >
> > >> I tried running kafka-simple-consumer-shell. I can see the following
> > >> mbean.
> > >>
> > >>
> > >>
> >
> "kafka.consumer":type="FetchRequestAndResponseMetrics",name="SimpleConsumerShell-AllBrokersFetchRequestRateAndTimeMs"
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Wed, Nov 12, 2014 at 9:57 PM, Madhukar Bharti <
> > >> bhartimadhu...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Jun Rao,
> > >> >
> > >> > Thanks for your quick reply.
> > >> >
> > >> > I am not able to see this  any bean named as "SimpleConsumer". Is
> > there
> > >> any
> > >> > configuration related to this?
> > >> >
> > >> > How can I see this bean named listing in Jconsole window?
> > >> >
> > >> >
> > >> > Thanks and Regards
> > >> > Madhukar
> > >> >
> > >> > On Thu, Nov 13, 2014 at 6:06 AM, Jun Rao  wrote:
> > >> >
> > >> > > Those are for 0.7. In 0.8, you should see sth
> > >> > > like FetchRequestRateAndTimeMs in SimpleConsumer.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > > On Wed, Nov 12, 2014 at 5:14 AM, Madhukar Bharti <
> > >> > bhartimadhu...@gmail.com
> > >> > > >
> > >> > > wrote:
> > >> > >
> > >> > > > Hi,
> > >> > > >
> > >> > > > I want to get the simple consumer details using MBean as
> described
> > >> here
> > >> > > > <
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> > >> > > > >.
> > >> > > > But these bean names are not showing in JConsole as well as
> while
> > >> > trying
> > >> > > to
> > >> > > > read from JMX.
> > >> > > >
> > >> > > > Please help me to get simple consumer details.
> > >> > > >
> > >> > > > I am using Kafka 0.8.1.1 version.
> > >> > > >
> > >> > > >
> > >> > > > Thanks and Regards,
> > >> > > > Madhukar Bharti
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Thanks and Regards,
> > >> > Madhukar Bharti
> > >> > Mob: 7845755539
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks and Regards,
> > > Madhukar Bharti
> > > Mob: 7845755539
> > >
> >
>



-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Re: Getting Simple consumer details using MBean

2014-11-13 Thread Madhukar Bharti
Hi Jun Rao,

Sorry to disturb you. But I my Kafka setup it is not showing. I am
attaching screen shot taken from all brokers.

In kafka.consumer it is listing only "ReplicaFetcherThread".

As I said earlier I am using "2.10-0.8.1.1" version. Do i need to configure
any extra parameter for this? I am simply using the same configuration as
described in wiki page.



Thanks and Regards,
Madhukar


On Fri, Nov 14, 2014 at 1:17 AM, Jun Rao  wrote:

> I tried running kafka-simple-consumer-shell. I can see the following mbean.
>
>
> "kafka.consumer":type="FetchRequestAndResponseMetrics",name="SimpleConsumerShell-AllBrokersFetchRequestRateAndTimeMs"
>
> Thanks,
>
> Jun
>
> On Wed, Nov 12, 2014 at 9:57 PM, Madhukar Bharti  >
> wrote:
>
> > Hi Jun Rao,
> >
> > Thanks for your quick reply.
> >
> > I am not able to see this  any bean named as "SimpleConsumer". Is there
> any
> > configuration related to this?
> >
> > How can I see this bean named listing in Jconsole window?
> >
> >
> > Thanks and Regards
> > Madhukar
> >
> > On Thu, Nov 13, 2014 at 6:06 AM, Jun Rao  wrote:
> >
> > > Those are for 0.7. In 0.8, you should see sth
> > > like FetchRequestRateAndTimeMs in SimpleConsumer.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Nov 12, 2014 at 5:14 AM, Madhukar Bharti <
> > bhartimadhu...@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I want to get the simple consumer details using MBean as described
> here
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> > > > >.
> > > > But these bean names are not showing in JConsole as well as while
> > trying
> > > to
> > > > read from JMX.
> > > >
> > > > Please help me to get simple consumer details.
> > > >
> > > > I am using Kafka 0.8.1.1 version.
> > > >
> > > >
> > > > Thanks and Regards,
> > > > Madhukar Bharti
> > > >
> > >
> >
> >
> >
> > --
> > Thanks and Regards,
> > Madhukar Bharti
> > Mob: 7845755539
> >
>



-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Re: Getting Simple consumer details using MBean

2014-11-12 Thread Madhukar Bharti
Hi Jun Rao,

Thanks for your quick reply.

I am not able to see this  any bean named as "SimpleConsumer". Is there any
configuration related to this?

How can I see this bean named listing in Jconsole window?


Thanks and Regards
Madhukar

On Thu, Nov 13, 2014 at 6:06 AM, Jun Rao  wrote:

> Those are for 0.7. In 0.8, you should see sth
> like FetchRequestRateAndTimeMs in SimpleConsumer.
>
> Thanks,
>
> Jun
>
> On Wed, Nov 12, 2014 at 5:14 AM, Madhukar Bharti  >
> wrote:
>
> > Hi,
> >
> > I want to get the simple consumer details using MBean as described here
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring
> > >.
> > But these bean names are not showing in JConsole as well as while trying
> to
> > read from JMX.
> >
> > Please help me to get simple consumer details.
> >
> > I am using Kafka 0.8.1.1 version.
> >
> >
> > Thanks and Regards,
> > Madhukar Bharti
> >
>



-- 
Thanks and Regards,
Madhukar Bharti
Mob: 7845755539


Getting Simple consumer details using MBean

2014-11-12 Thread Madhukar Bharti
Hi,

I want to get the simple consumer details using MBean as described here
<https://cwiki.apache.org/confluence/display/KAFKA/Operations#Operations-Monitoring>.
But these bean names are not showing in JConsole as well as while trying to
read from JMX.

Please help me to get simple consumer details.

I am using Kafka 0.8.1.1 version.


Thanks and Regards,
Madhukar Bharti