Re: FW: Mirrormaker stops consuming

2016-02-08 Thread Rajasekar Elango
Debbie,

In our  case, the issue was with custom SSL implementation and we need to
fix a bug related to buffer over flow handling. This may not be applicable
to you, if you are using open source kafka. The best way to troubleshoot
this problem would be to take couple of thread dumps with few seconds delay
and look at stack trace. This will help you identify the code block that it
is spinning through when it is stuck.

Thanks,
Raja.

On Fri, Feb 5, 2016 at 10:28 PM, Yu, Debbie  wrote:

> Hi Raja,
>
> I’m not used to using a mailing list, so I thought I’d try sending to you
> directly instead.
> We were wondering if you had fixed the problem you saw before with the
> mirror maker,
> Since we seem to be seeing the same problem now.
> The difference is we’re not using the custom SSLSocketChannel discussed in
> the thread.
> Any kind of info you could share would be much appreciated.
>
> Thanks,
> Debbie
>
> On 2/5/16, 7:17 PM, "Yu, Debbie"  wrote:
>
> >Hi Raja,
> >
> >We seem to be encountering the same problem you were seeing where our
> >producer thread becomes blocked for some reason.
> >We also see our producer queue is full,
> >and for some reason, the producer isn¹t pulling from the queue and sending
> >to our brokers.
> >We were wondering if you might be able to share how you fixed your problem
> >if you fixed it.
> >
> >Thanks,
> >Debbie
> >
>
>


-- 
Thanks,
Raja.


Does Kafka recover all data if node is reimaged

2015-10-12 Thread Rajasekar Elango
I was wondering if a kafka broker node get reimaged and all data is wiped
off, Will kafka recover all data on node from replication?

-- 
Thanks,
Raja.


Re: Painfully slow kafka recovery / cluster breaking

2015-08-26 Thread Rajasekar Elango
Thanks for updates Jörg. It's very useful.

Thanks,
Raja.

On Wed, Aug 26, 2015 at 8:58 AM, Jörg Wagner  wrote:

> Just a little feedback on our issue(s) as FYI to whoever is interested.
>
> It basically all boiled down to the configuration of topics. We noticed
> while performance testing (or trying to ;) ) that the partitioning was most
> critical to us.
>
> We originally followed the linkedin recommendation and used 600 partitions
> for our main topic. Testing that, the replicas always went out of sync
> within a short timeframe, leaders could not be determined and the cluster
> failed horribly (even writing several hundred lines of logs within a
> 1/100th second).
>
> So for our 27 log.dirs (= disks) we went with 27 partitions. And voilá: we
> could use kafka with around 35k requests per second (via an application
> accessing it). Kafka stayed stable.
>
> Currently we are testing with 81 partitions (27*3) and it's running well.
> No issues so far, replicas in sync and up to 50k requests per second.
>
> Cheers
>
> On 25.08.2015 15:18, Jörg Wagner wrote:
>
>> So okay, this is a little embarassing but the core of the issue was that
>> max open files was not set correctly for kafka. It was not an oversight,
>> but a few things together caused that the system configuration was not
>> changed correctly, resulting in the default value.
>>
>> No wonder that kafka behaved strangely everytime we had enough data in
>> log.dirs and connections.
>>
>> Anyhow, that doesn't seem to be the last problem. The brokers get in sync
>> with each other (within an expected time frame), everything seems fine.
>>
>> After a little stress testing, the kafka cluster falls apart (around 40k
>> requests/s). Using topics describe we can see leaders missing (e.g. from
>> 1,2,3 only 1 and 3 are leading partitions, although zookeeper lists all
>> under /brokers/ids). This ultimately results in partitions being
>> unavailable and massive "leader not local" spam in the logs.
>>
>> What are we missing?
>>
>> Cheers
>> Jörg
>>
>> On 24.08.2015 10:31, Jörg Wagner wrote:
>>
>>> Thank you for your answers.
>>>
>>> @Raja
>>> No, it also seems to happen if we stop kafka completely clean.
>>>
>>> @Gwen
>>> I was testing the situation with num.replica.fetchers set higher. If you
>>> say that was the right direction, I will try it again. What would be a good
>>> setting? I went with 50 which seemed reasonable (having 27 single disks).
>>> How long should it take to get complete ISR?
>>>
>>> Regarding no Data flowing into kafka: I just wanted to point out that
>>> the setup is not yet live. So we can completely stop the usage of kafka,
>>> and it should possibly get into sync faster without a steady stream of new
>>> messages.
>>> Kafka itself is working fine during this on the other hand, "just"
>>> missing ISR, hence redundancy. If I stop another broker (clean!) though, it
>>> tends to happen that the expected number of partitions have Leader -1;
>>> which should not happen as I assume.
>>>
>>> Cheers
>>> Jörg
>>>
>>> On 21.08.2015 19:18, Rajasekar Elango wrote:
>>>
>>>> We are seeing same behavior in 5 broker cluster when losing one broker.
>>>>
>>>> In our case, we are losing broker as well as kafka data dir.
>>>>
>>>> Jörg Wagner,
>>>>
>>>> Are you losing just broker or kafka data dir as well?
>>>>
>>>> Gwen,
>>>>
>>>> We have also observed that latency of messages arriving at consumers
>>>> goes
>>>> up by 10x when we lose a broker. Is it because the broker is busy with
>>>> handling failed fetch requests and loaded with more data thats slowing
>>>> down
>>>> the writes ? Also, if we had simply lost the broker not the data dir,
>>>> impact would have been minimal?
>>>>
>>>> Thanks,
>>>> Raja.
>>>>
>>>>
>>>>
>>>> On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira 
>>>> wrote:
>>>>
>>>> By default, num.replica.fetchers = 1. This means only one thread per
>>>>> broker
>>>>> is fetching data from leaders. This means it make take a while for the
>>>>> recovering machine to catch up and rejoin the ISR.
>>>>>
>>>>> If you have bandwidth to spare, try increasing this value.
>>>&g

Re: Painfully slow kafka recovery

2015-08-21 Thread Rajasekar Elango
We are seeing same behavior in 5 broker cluster when losing one broker.

In our case, we are losing broker as well as kafka data dir.

Jörg Wagner,

Are you losing just broker or kafka data dir as well?

Gwen,

We have also observed that latency of messages arriving at consumers goes
up by 10x when we lose a broker. Is it because the broker is busy with
handling failed fetch requests and loaded with more data thats slowing down
the writes ? Also, if we had simply lost the broker not the data dir,
impact would have been minimal?

Thanks,
Raja.



On Fri, Aug 21, 2015 at 12:31 PM, Gwen Shapira  wrote:

> By default, num.replica.fetchers = 1. This means only one thread per broker
> is fetching data from leaders. This means it make take a while for the
> recovering machine to catch up and rejoin the ISR.
>
> If you have bandwidth to spare, try increasing this value.
>
> Regarding "no data flowing into kafka" - If you have 3 replicas and only
> one is down, I'd expect writes to continue to the new leader even if one
> replica is not in the ISR yet. Can you see that a new leader is elected?
>
> Gwen
>
> On Fri, Aug 21, 2015 at 6:50 AM, Jörg Wagner 
> wrote:
>
> > Hey everyone,
> >
> > here's my crosspost from irc.
> >
> > Our setup:
> > 3 kafka 0.8.2 brokers with zookeeper, powerful hardware (20 cores, 27
> > logdisks each). We use a handful of topics, but only one topic is
> utilized
> > heavily. It features a replication of 2 and 600 partitions.
> >
> > Our issue:
> > If one kafka was down, it takes very long ( from 1 to >10 hours) to show
> > that all partitions have all isr again. This seems to heavily depend on
> the
> > amount of data which is in the log.dirs (I have configured 27 threads -
> one
> > for each dir featuring a own drive).
> > This all takes this long while there is NO data flowing into kafka.
> >
> > We seem to be missing something critical here. It might be some option
> set
> > wrong, or are we thinking wrong and it's not critical to have the
> replicas
> > in sync.
> >
> > Any pointers would be great.
> >
> > Cheers
> > Jörg
> >
>



-- 
Thanks,
Raja.


Re: java.lang.OutOfMemoryError: Java heap space

2015-08-06 Thread Rajasekar Elango
Hi Sourabh,

We have seen this error, if kafka broker was running with SSL on Consumer
is trying to consumer in plaintext mode. Are you using high level consumer
or SimpleConsumer..? If you using using SimpleConsumer, pull latest code
from my repo 
and
pass secure parameters to SimpleConsumer constructor.

Thanks,
Raja.

On Thu, Aug 6, 2015 at 9:01 PM, Sourabh Chandak 
wrote:

> Hi,
>
> I am trying to integrate
> https://github.com/relango/kafka/tree/kafka_security_0.8.2 with Spark
> Streaming using the SimpleConsumer. I know that the SSL patch is on its way
> but need to set up a prototype hence went ahead with Raja's version.
>
> So when I run my spark job to retrieve data from 1 topic with just 1
> partition I get a OutOfMemoryError.
>
> Here is the stack trace:
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:91)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at org.apache.spark.streaming.kafka.KafkaCluster.org
>
> $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
> at
>
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
>
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:310)
>
> Need help from experts to resolve this.
>
> Thanks,
> Sourabh
>



-- 
Thanks,
Raja.


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Rajasekar Elango
Here is an example on what sharninder suggested
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

Thanks,
Raja.

On Tue, Aug 4, 2015 at 12:01 PM, Sharninder  wrote:

> You can't. Kafka is essentially a queue, so you always read messages one
> by one. What you can do is disable auto offset commit, read 100 messages,
> process them and then manually commit offset.
>
> --
> Sharninder
>
> > On 04-Aug-2015, at 9:07 pm, shahab  wrote:
> >
> > Hi,
> >
> > While we the producer can put data as batch in kafka server,  I couldn't
> > find any API (or any document) saying how we can fetch data as batch from
> > Kafka ?
> > Even when data is placed as batch in kafka server, still using High Level
> > consumer I can only read one by one, and I can not specify. for example,
> > read 100 items at once!
> >
> > Is this correct observation? or I am missing something?
> >
> > best,
> > /Shahab
>



-- 
Thanks,
Raja.


Reimaging zookeeper host

2015-06-30 Thread Rajasekar Elango
We are running 3-node zookeeper cluster and we need to re-image (re-install
os) on zookeeper host. Is it ok to lose zookeeper dataDir during upgrade or
should back up zookeeper dataDir and restore when zookeeper comes backup
online? Will kafka and consumers work fine if we bring up zookeeper with
emtpy dataDir with just myid file?



-- 
Thanks,
Raja.


Re: how to modify offsets stored in Kafka in 0.8.2.1 version?

2015-06-19 Thread Rajasekar Elango
Hi Marina,

Check slide 32 in this presentation
.

Hope this helps.

Thanks,
Raja.

On Fri, Jun 19, 2015 at 9:43 AM, Marina  wrote:

> Thanks, Stevo, for the quick reply,
> Yes, I understand how to do this programmatically - but I would like to be
> able to do this manually from a command line, just as before I was able to
> do this in the Zookeeper shell. I don't want to write and run a Java app
> just to set an offset :)
>
> [unless, of course, this is the only way to do this.]
>
> thanks!
> Marina
>
>
>
>
> - Original Message -
> From: Stevo Slavić 
> To: users@kafka.apache.org; Marina 
> Cc:
> Sent: Friday, June 19, 2015 9:33 AM
> Subject: Re: how to modify offsets stored in Kafka in 0.8.2.1 version?
>
> Hello Marina,
>
> There's Kafka API to fetch and commit offsets
>
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> - maybe it will work for you.
>
> Kind regards,
> Stevo Slavic.
>
>
> On Fri, Jun 19, 2015 at 3:23 PM, Marina  wrote:
>
> > Hi,
> >
> > in older Kafka versions where offsets were stored in Zookeeper - I could
> > manually update the value of the Zookeeper's node:
> >
> >
> /consumers//offsets///.
> >
> > In 0.8.2.1 - there are no values in offsets anymore, but there is a new
> > topic,
> > __consumer_offsets, where as I understand offsets are tracked now.
> >
> > the ConsumerOffsetChecker tool seems to be able to get the offsets values
> > from this topic , since I see correct value running it.
> > So, how do I access this info myself?
> >
> >
> > I tried:
> >
> > ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> > __consumer_offsets --from-beginning
> >
> > but it does not show anything
> > Also, how would I change the offset? I need to do this sometimes if I
> want
> > to skip/ignore some messages and just advance offset manually.
> >
> > thanks,
> > Marina
> >
>



-- 
Thanks,
Raja.


Re: Mirrormaker stops consuming

2015-05-22 Thread Rajasekar Elango
Thanks for pointers Joel. Will look into SSLSocketChannel. Yes this was
working fine before upgrade.

If its just one producer thread stuck on write, it might affect only one
consumer thread/partition. But we found consuming stopped for all
topic/partitions. Or Is it only single  data channel shared between all
producer and consumer threads..?

Thanks,
Raja.


On Fri, May 22, 2015 at 12:12 PM, Joel Koshy  wrote:

> The threaddump suggests that one of the producers
> (mirrormaker-producer-6) is blocked on write for some reason. So the
> data-channel for that producer (which sits between the consumers and
> the producer) is full which blocks the consumers from progressing.
>
> This appears to be in your (custom) SSLSocketChannel code. If you take
> consecutive threaddumps I'm guessing you would see the same trace. If
> this is reproducible can you do that? You can also hook up jvisualvm
> or yourkit to see which threads are active and it may well be that
> producer in a tight loop on the writeCompletely. Just to confirm you
> did not see this issue before upgrading?
>
> Joel
>
> On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote:
> > We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
> > that randomly stops consuming. We had to restart the mirrormaker process
> to
> > resolve the problem. This problem has occurred several times in past two
> > weeks.
> >
> > Here is what I found in analysis:
> >
> > When this problem happens:
> >
> > Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
> > messages in mirrormaker log are ProducerSendThread producing to
> > destination. No errors or exceptions.
> >
> > Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
> > mirrormaker consumer offset stops incrementing.
> >
> > Mirrormaker consumer MinFetch rate jmx metric drops to zero.
> > ConsumerTopicMetric.BytesPerSec drops to zero.
> >
> > So its mirrormaker consumer should have stopped accepting new data.
> >
> > Can some one provide input on how to trouble shoot this problem further
> and
> > identify root cause?
> >
> > Got Thread dump before restarting, it looks ok to me, no blocked thread.
> > Here is thread dump output
> >
> > 2015-05-21 18:59:09
> > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed
> mode):
> >
> > "Attach Listener" daemon prio=10 tid=0x7f7248002000 nid=0x2d53
> waiting
> > on condition [0x]
> >java.lang.Thread.State: RUNNABLE
> >
> >Locked ownable synchronizers:
> > - None
> >
> >
> "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2"
> > prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
> > [0x7f72833f2000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x00042cd15cc8> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > at
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > at
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > at kafka.utils.Utils$.inLock(Utils.scala:535)
> &

Mirrormaker stops consuming

2015-05-22 Thread Rajasekar Elango
We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
that randomly stops consuming. We had to restart the mirrormaker process to
resolve the problem. This problem has occurred several times in past two
weeks.

Here is what I found in analysis:

When this problem happens:

Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
messages in mirrormaker log are ProducerSendThread producing to
destination. No errors or exceptions.

Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
mirrormaker consumer offset stops incrementing.

Mirrormaker consumer MinFetch rate jmx metric drops to zero.
ConsumerTopicMetric.BytesPerSec drops to zero.

So its mirrormaker consumer should have stopped accepting new data.

Can some one provide input on how to trouble shoot this problem further and
identify root cause?

Got Thread dump before restarting, it looks ok to me, no blocked thread.
Here is thread dump output

2015-05-21 18:59:09
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

"Attach Listener" daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting
on condition [0x]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
- None

"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2"
prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
[0x7f72833f2000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00042cd15cc8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

   Locked ownable synchronizers:
- <0x00042ea62eb0> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)

"ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3"
prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition
[0x7f7281f99000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00042ccece80> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
at
kafka.server.AbstractFetcherThread$$anonfun$

Re: Kafka consumer offset checker hangs indefinitely

2015-05-14 Thread Rajasekar Elango
Hi Meghana,

We also faced similar issue and found that it returned
ConsumerCoordinatorNotAvailableCode always for one broker (server id 3) and
leader for all partitions of __consumer_offsets topic is same broker id 3.

So wiped off kafka data dir on that broker and restarted it. After that
ConsumerOffsetChecker started working. We had replication enabled so losing
kafka data dir is not big deal for us. If you can't delete complete kafka
data dir, you can try deleting just  __consumer_offsets data.

Thanks,
Raja.


On Thu, May 14, 2015 at 10:46 AM, Meghana Narasimhan <
mnarasim...@bandwidth.com> wrote:

> Hi Mayuresh,
> A few more inputs that I can provide at the moment after some testing are
> as follows.
> 1. The error returned by the consumer offset checker's
> ConsumerMetadataResponse is "ConsumerCoordinatorNotAvailableCode". Could it
> somehow be related to the offsets being written to zookeeper and not the
> internal kafka offsets topic ?
> 2. I see that the __consumer_offsets topic has been created on all the
> brokers.
> 3. I also tried the steps for migrating from zookeeper to kafka offset
> topic as specified in the documentation [using the offsets.storage and
> dual.commit.enabled configs]
> 4. Also based on a few other links I tried to commit offset
> using OffsetCommitRequest method and to get ConsumerMetadataResponse . But
> the OffsetCommitResponse also returned error and could not commit the
> offset successfully.
>
> On the other hand the producer and consumer are working fine and able to
> produce and consume data from the topic. The issue is only with the
> consumer offset checker tool.
>
> Thanks,
> Meghana
>
>
> On Mon, May 11, 2015 at 7:34 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Meghana,
> >
> > Let me try this out on my cluster that has latest trunk deployed.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, May 11, 2015 at 1:53 PM, Meghana Narasimhan <
> > mnarasim...@bandwidth.com> wrote:
> >
> > > Hi Mayuresh,
> > > A small update. The Kafka version I'm currently using is  2.10-0.8.2.1
> > (not
> > > 2.11 as previously mentioned). The cluster looks fine. Not sure why the
> > > consumer offset checker does not return a valid output and gets stuck.
> > >
> > > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
> > > Topic:test   PartitionCount:3ReplicationFactor:3
> > > Configs:min.insync.replicas=2
> > > Topic: test  Partition: 0Leader: 1   Replicas: 1,2,0
> Isr:
> > > 1,2,0
> > > Topic: test  Partition: 1Leader: 2   Replicas: 2,0,1
> Isr:
> > > 1,2,0
> > > Topic: test  Partition: 2Leader: 0   Replicas: 0,1,2
> Isr:
> > > 1,2,0
> > >
> > >
> > >
> > >
> > > On Fri, May 8, 2015 at 12:52 PM, Meghana Narasimhan <
> > > mnarasim...@bandwidth.com> wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > Yes, the broker is up and accepting connections. Multiple consumers
> are
> > > > consuming off topics on the broker.
> > > > Also I am seeing the issue only with this particular version (
> > > > 2.11-0.8.2.1). It worked fine with the beta that I was using earlier.
> > > >
> > > >
> > > > On Fri, May 8, 2015 at 12:45 PM, Mayuresh Gharat <
> > > > gharatmayures...@gmail.com> wrote:
> > > >
> > > >> Is X.X.X.X:9092 up and accepting connections?
> > > >> I am confused aas in why is it not connecting some other broker if
> > > >> connection to this broker fails. Can you check if the broker is up?
> > > >>
> > > >> The way it works is the consumer will send a ConsumerMetadataRequest
> > to
> > > >> one
> > > >> of the brokers and get the offsetmanager for its group and then
> > perform
> > > >> the
> > > >> offset management.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Mayuresh
> > > >>
> > > >> On Fri, May 8, 2015 at 9:22 AM, Meghana Narasimhan <
> > > >> mnarasim...@bandwidth.com> wrote:
> > > >>
> > > >> > Hi,
> > > >> > I'm using the Kafka 8.2.1 version(kafka_2.11-0.8.2.1) and the
> > consumer
> > > >> > offset checker hangs indefinitely and does not return any
> results. I
> > > >> > enabled the debug for tools and below is the debug statements as
> > seen
> > > on
> > > >> > the stdout. Any thoughts or inputs on this will be much
> appreciated.
> > > >> >
> > > >> > command used :
> > > >> > bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181
> > > --group
> > > >> > test-consumer-group
> > > >> >  or
> > > >> > ./kafka-consumer-offset-checker.sh --zookeeper
> > > >> > broker1:2181,broker2:2181,broker3:2181 --group test-consumer-group
> > > >> >
> > > >> >  DEBUG Querying X.X.X.X:9092 to locate offset manager for
> > > >> > test-consumer-group. (kafka.client.ClientUtils$)
> > > >> > [2015-05-08 10:23:55,090] DEBUG Consumer metadata response:
> > > >> > ConsumerMetadataResponse(None,15,0) (kafka.client.ClientUtils$)
> > > >> > [2015-05-08 10:23:55,091] DEBUG Query to X.X.X.X:9092 to locate
> > offset
> > > >> > manager for test-consumer-group failed - will retry in 3000
> > > >

Re: How to set console consumer group ID

2015-04-22 Thread Rajasekar Elango
Yes, you pass any consumer property including group.id by having them in
property file and passing path to it using --consumer.config of consumer
consumer.

Thanks,
Raja.

On Wed, Apr 22, 2015 at 1:45 AM, Lukáš Havrlant  wrote:

> Hi,
> is it possible to set group ID for console consumer on command line?
> Something like
>
> $ bin/kafka-console-consumer.sh --groupid myGroupId
>
> Lukáš
>



-- 
Thanks,
Raja.


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2.0 Released

2015-02-04 Thread Rajasekar Elango
YaY!. Thanks to Jun and everybody who contributed to this release. We have
been waiting for this release for a while.

Thanks,
Rajasekar Elango (Salesforce.com).

On Tue, Feb 3, 2015 at 8:37 PM, Jun Rao  wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 0.8.2.0.
>
> The 0.8.2.0 release introduces many new features, improvements and fixes
> including:
>  - A new Java producer for ease of implementation and enhanced performance.
>  - A Kafka-based offset storage.
>  - Delete topic support.
>  - Per topic configuration of preference for consistency over availability.
>  - Scala 2.11 support and dropping support for Scala 2.8.
>  - LZ4 Compression.
>
> All of the changes in this release can be found:
> https://archive.apache.org/dist/kafka/0.8.2.0/RELEASE_NOTES.html
>
> Apache Kafka is high-throughput, publish-subscribe messaging system
> rethought of as a distributed commit log.
>
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
> and
> writes per second from thousands of clients.
>
> ** Scalable => Kafka is designed to allow a single cluster to serve as the
> central data backbone
> for a large organization. It can be elastically and transparently expanded
> without downtime.
> Data streams are partitioned and spread over a cluster of machines to allow
> data streams
> larger than the capability of any single machine and to allow clusters of
> co-ordinated consumers.
>
> ** Durable => Messages are persisted on disk and replicated within the
> cluster to prevent
> data loss. Each broker can handle terabytes of messages without performance
> impact.
>
> ** Distributed by Design => Kafka has a modern cluster-centric design that
> offers
> strong durability and fault-tolerance guarantees.
>
> You can download the release from: http://kafka.apache.org/downloads.html
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
>
> Thanks,
>
> Jun
>



-- 
Thanks,
Raja.


Re: kafka monitoring

2015-01-08 Thread Rajasekar Elango
Hi Sa Li,

You need to set environment variable $JMX_PORT to enable jmx while starting
kafka.  See to kafka-run-class.sh on how it is used. Then you can connect
to : using Jconsole.

Thanks,
Raja.

On Thu, Jan 8, 2015 at 2:08 PM, Sa Li  wrote:

> Hello, All
>
> I understand many of you are using jmxtrans along with graphite/ganglia to
> pull out metrics, according to https://kafka.apache.org/081/ops.html,  it
> says "The easiest way to see the available metrics to fire up jconsole and
> point it at a running kafka client or server; this will all browsing all
> metrics with JMX. .."
>
> I tried to fire up a jconsole on windows attempting to access our dev and
> production cluster which are running good,
> here is the main node of my dev:
> 10.100.75.128, broker port:9092, zk port:2181
>
> Jconsole shows:
>
>  New Connection
> Remote Process:
>
> Usage: : OR service:jmx::
> Username:Password:
>
> Sorry about my naive, I tried connect base on above ip just can't be
> connected, do I need to do something in dev server to be able to make it
> work?
>
> thanks
>
> --
>
> Alec Li
>



-- 
Thanks,
Raja.


Re: kafka monitoring system

2014-12-22 Thread Rajasekar Elango
Hi Sa Li,

You can also try jmxtrans + graphite (for charting). jmxtrans has graphite
output adapter out of the box.

Regards,
Raja.

On Mon, Dec 22, 2014 at 10:39 PM, YuanJia Li  wrote:

> Hi Sa Li,
> You can try to use jmxtrans+opentsdb to monitor kafka. Jmxtrans is
> collecting data with JMX and sending to opentsdb. Opentsdb is graphing and
> alerting.
>
>
>
>
> YuanJia Li
>
>
>
>
>
> From: Sa Li
> Date: 2014-12-23 08:41
> To: users
> Subject: kafka monitoring system
> Hi, all
>
> I am thinking to make a reliable monitoring system for our kafka production
> cluster. I read such from documents:
>
> "Kafka uses Yammer Metrics for metrics reporting in both the server and the
> client. This can be configured to report stats using pluggable stats
> reporters to hook up to your monitoring system.
>
> The easiest way to see the available metrics to fire up jconsole and point
> it at a running kafka client or server; this will all browsing all metrics
> with JMX.
>
> We pay particular we do graphing and alerting on the following metrics:
>
> .."
>
>
> I am wondering if anyone ever use Jconsole to monitor the kafka, or anyone
> can recommend a good monitoring tool for kafka production.
>
>
> thanks
>
>
> --
>
> Alec Li




-- 
Thanks,
Raja.


Re: Announcing Confluent

2014-11-06 Thread Rajasekar Elango
Congrats. Wish you all the very best and success.

Thanks,
Raja.

On Thu, Nov 6, 2014 at 1:36 PM, Niek Sanders  wrote:

> Congrats!
>
> On Thu, Nov 6, 2014 at 10:28 AM, Jay Kreps  wrote:
> > Hey all,
> >
> > I’m happy to announce that Jun Rao, Neha Narkhede and I are creating a
> > company around Kafka called Confluent. We are planning on productizing
> the
> > kind of Kafka-based real-time data platform we built out at LinkedIn. We
> > are doing this because we think this is a really powerful idea and we
> felt
> > there was a lot to do to make this idea really take root. We wanted to
> make
> > that our full time mission and focus.
> >
> > There is a blog post that goes into a little more depth here:
> > http://blog.confluent.io/
> >
> > LinkedIn will remain a heavy Kafka user and contributor. Combined with
> our
> > additional resources from the funding of the company this should be a
> > really good thing for the Kafka development effort. Especially when
> > combined with the increasing contributions from the rest of the
> development
> > community. This is great news, as there is a lot of work to do. We'll
> need
> > to really focus on scaling this distributed development in a healthy way.
> >
> > One thing I do want to emphasize is that the addition of a company in the
> > Kafka ecosystem won’t mean meddling with open source. Kafka will remain
> > 100% open source and community focused, as of course is true of any
> Apache
> > project. I have been doing open source for a long time and strongly
> believe
> > it is the right model for infrastructure software development.
> >
> > Confluent is just getting off the ground now. We left LinkedIn, raised
> some
> > money, and we have an office (but no furniture yet!). None the less, f
> you
> > are interested in finding out more about the company and either getting
> > help with your Kafka usage or joining us to help build all this, by all
> > means reach out to us, we’d love to talk.
> >
> > Wish us luck!
> >
> > -Jay
>



-- 
Thanks,
Raja.


Re: MBeans, dashes, underscores, and KAFKA-1481

2014-10-17 Thread Rajasekar Elango
+1 on getting rid of quotes in jmx mbeans.

Thanks,
Raja.

On Fri, Oct 17, 2014 at 4:48 PM, Neha Narkhede 
wrote:

> +1 on getting rid of the quotes.
>
> On Fri, Oct 17, 2014 at 12:31 PM, Magnus Spångdal <
> magnus.spang...@deltaprojects.com> wrote:
>
> > +1 to get rid of quotes, thanks!
> >
> >
> >
> >
> >
> >
> > —
> > Sent from Mailbox
> >
> > On Fri, Oct 17, 2014 at 8:54 PM, Jun Rao  wrote:
> >
> > > Hi, everyone,
> > > We are fixing the mbean names in kafka-1482, by adding separate
> explicit
> > > tags in the name for things like clientId and topic. Another thing that
> > > some people have complained before is that we use quotes in the jmx
> name.
> > > Should we also just get rid of the quotes as part of kafka-1482? So,
> > > instead of
> > >
> "kafka.server":type="BrokerTopicMetrics",name="topic-1-BytesInPerSec"
> > > we will have
> > >
> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1
> > > Thanks,
> > > Jun
> > > On Thu, Oct 9, 2014 at 11:12 AM, Neha Narkhede <
> neha.narkh...@gmail.com>
> > > wrote:
> > >> I am going to vote for 1482 to be included in 0.8.2, if we have a
> patch
> > >> submitted in a week. I think we've had this JIRA opened for too long
> > and we
> > >> held people back so it's only fair to release this.
> > >>
> > >> On Wed, Oct 8, 2014 at 9:40 PM, Jun Rao  wrote:
> > >>
> > >> > Otis,
> > >> >
> > >> > Just have the patch ready asap. We can make a call then.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Jun
> > >> >
> > >> > On Wed, Oct 8, 2014 at 6:13 AM, Otis Gospodnetic <
> > >> > otis.gospodne...@gmail.com
> > >> > > wrote:
> > >> >
> > >> > > Hi Jun,
> > >> > >
> > >> > > Would by the end of next week be acceptable for 0.8.2?
> > >> > >
> > >> > > Thanks,
> > >> > > Otis
> > >> > > --
> > >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > Management
> > >> > > Solr & Elasticsearch Support * http://sematext.com/
> > >> > >
> > >> > >
> > >> > > On Tue, Oct 7, 2014 at 4:04 PM, Jun Rao  wrote:
> > >> > >
> > >> > > > Otis,
> > >> > > >
> > >> > > > Yes, if you guys can help provide a patch in a few days, we can
> > >> > probably
> > >> > > > get it to the 0.8.2 release.
> > >> > > >
> > >> > > > Thanks,
> > >> > > >
> > >> > > > Jun
> > >> > > >
> > >> > > > On Tue, Oct 7, 2014 at 12:10 PM, Otis Gospodnetic <
> > >> > > > otis.gospodne...@gmail.com> wrote:
> > >> > > >
> > >> > > > > Hi Jun,
> > >> > > > >
> > >> > > > > I think your MBean renaming approach will work.  I see
> > >> > > > > https://issues.apache.org/jira/browse/KAFKA-1481 has Fix
> > Version
> > >> > > 0.8.2,
> > >> > > > > but
> > >> > > > > is not marked as a Blocker.  We'd love to get the MBeans fixed
> > so
> > >> > this
> > >> > > > > makes it in 0.8.2 release.  Do you know if this is on anyone's
> > >> plate
> > >> > > (the
> > >> > > > > issue is currently Unassigned)?  If not, should we provide a
> new
> > >> > patch
> > >> > > > that
> > >> > > > > uses your approach?
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Otis
> > >> > > > > --
> > >> > > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> > >> > Management
> > >> > > > > Solr & Elasticsearch Support * http://sematext.com/
> > >> > > > >
> > >> > > > >
> > >> > > > > On Thu, Sep 18, 2014 at 4:49 PM, Jun Rao 
> > wrote:
> > >> > > > >
> > >> > > > > > Otis,
> > >> > > > > >
> > >> > > > > > In kafka-1481, we will have to change the mbean names (at
> > least
> > >> the
> > >> > > > ones
> > >> > > > > > with clientid and topic) anyway. Using the name/value pair
> in
> > the
> > >> > > mbean
> > >> > > > > > name allows us to do this in a cleaner way. Yes, "," is not
> > >> allowed
> > >> > > in
> > >> > > > > > clientid or topic.
> > >> > > > > >
> > >> > > > > > Bhavesh,
> > >> > > > > >
> > >> > > > > > Yes, I was thinking of making changes in the new metrics
> > package.
> > >> > > > > Something
> > >> > > > > > like allowing the sensor names to have name/value pairs. The
> > jmx
> > >> > > names
> > >> > > > > will
> > >> > > > > > just follow accordingly. This is probably cleaner than doing
> > the
> > >> > > > > escaping.
> > >> > > > > > Also, the metric names are more intuitive (otherwise, you
> > have to
> > >> > > know
> > >> > > > > > which part is the clientid and which part is the topic).
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > >
> > >> > > > > > Jun
> > >> > > > > >
> > >> > > > > > On Wed, Sep 17, 2014 at 2:32 PM, Otis Gospodnetic <
> > >> > > > > > otis.gospodne...@gmail.com> wrote:
> > >> > > > > >
> > >> > > > > > > Hi Jun,
> > >> > > > > > >
> > >> > > > > > > On Wed, Sep 17, 2014 at 12:35 PM, Jun Rao <
> jun...@gmail.com
> > >
> > >> > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Bhavesh,
> > >> > > > > > > >
> > >> > > > > > > > Yes, allowing dot in clientId and topic makes it a bit
> > harder
> > >> > to
> > >> > > > > define
> > >> > > > > > > the
> > >> > > > > > > > JMX bean names. I see a couple of solutions here.
> > >> > > > > > > >
>

Re: [DISCUSS] Kafka Security Specific Features

2014-07-31 Thread Rajasekar Elango
Can we get the info on targeted release dates for 0.8.2 release and 0.9
release for our planning purposes?

Thanks.
Raja.


On Wed, Jul 30, 2014 at 7:27 PM, Joe Stein  wrote:

> The 0.8.2 release will not have the patch inside of it.  Trunk already has
> a lot inside of it as a point release.  The patch also doesn't account for
> all of the requirements that all of the stakeholders need/want for the
> feature.  Instead of releasing something that is useful but only for some
> it is better to spend the time to get it right for everyone.  We are going
> to have it in the 0.9 release (possibly also with authorization, encryption
> and more of the security features too) then.
>
> What we will do is keep the patch rebased against trunk and then then 0.8.2
> branch (once we get to that point) so that folks can apply it to the 0.8.2
> release and do a build from src.  When we get to that I can create a write
> or something if folks find problems doing it.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> /
>
>
> On Wed, Jul 30, 2014 at 7:10 PM, Calvin Lei  wrote:
>
> > yeah i just saw that. Looking forward to the prod release of 0.8.2
> >
> >
> > On Wed, Jul 30, 2014 at 11:01 AM, Rajasekar Elango <
> rela...@salesforce.com
> > >
> > wrote:
> >
> > > We implemented security features on older snapshot version of 0.8
> kafka.
> > > But Joe Stein's organization rebased it to latest version of kafka
> > > available at https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477
> .
> > >
> > > Thanks,
> > > Raja.
> > >
> > >
> > > On Tue, Jul 29, 2014 at 10:54 PM, Calvin Lei  wrote:
> > >
> > > > Raja,
> > > >Which Kafka version is your security enhancement based on?
> > > >
> > > > thanks,
> > > > Cal
> > > >
> > > >
> > > > On Wed, Jul 23, 2014 at 5:01 PM, Chris Neal 
> wrote:
> > > >
> > > > > Pramod,
> > > > >
> > > > > I got that same error when following the configuration from Raja's
> > > > > presentation earlier in this thread.  If you'll notice the usage
> for
> > > the
> > > > > console_producer.sh, it is slightly different, which is also
> slightly
> > > > > different than the scala code for the ConsoleProducer. :)
> > > > >
> > > > > When I changed this:
> > > > >
> > > > > bin/kafka-console-producer.sh --broker-list n5:9092:true --topic
> test
> > > > >
> > > > > to this:
> > > > >
> > > > > bin/kafka-console-producer.sh --broker-list n5:9092 --secure
> > > > > --client.security.file config/client.security.properties --topic
> test
> > > > >
> > > > > I was able to push messages to the topic, although I got a WARN
> about
> > > the
> > > > > property "topic" not being valid, even though it is required.
> > > > >
> > > > > Also, the Producer reported this warning to me:
> > > > >
> > > > > [2014-07-23 20:45:24,509] WARN Attempt to reinitialize auth context
> > > > > (kafka.network.security.SecureAuth$)
> > > > >
> > > > > and the broker gave me this:
> > > > > [2014-07-23 20:45:24,114] INFO begin ssl handshake for
> > > > > n5.example.com/192.168.1.144:48817//192.168.1.144:9092
> > > > > (kafka.network.security.SSLSocketChannel)
> > > > > [2014-07-23 20:45:24,374] INFO finished ssl handshake for
> > > > > n5.example.com/192.168.1.144:48817//192.168.1.144:9092
> > > > > (kafka.network.security.SSLSocketChannel)
> > > > > [2014-07-23 20:45:24,493] INFO Closing socket connection to
> > > > > n5.example.com/192.168.1.144. (kafka.network.Processor)
> > > > > [2014-07-23 20:45:24,555] INFO begin ssl handshake for
> > > > > n5.example.com/192.168.1.144:48818//192.168.1.144:9092
> > > > > (kafka.network.security.SSLSocketChannel)
> > > > > [2014-07-23 20:45:24,566] INFO finished ssl handshake for
> > > > > n5.example.com/192.168.1.144:48818//192.168.1.144:9092
> > > > > (kafka.network.security.SSLSocketChannel

Re: [DISCUSS] Kafka Security Specific Features

2014-07-30 Thread Rajasekar Elango
gt; at
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > >
> > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
> > > >
> > > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
> > > >
> > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> > > >
> > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> > > >
> > > > ... 12 more
> > > > [2014-07-18 13:12:45,337] WARN Fetching topic metadata with
> correlation
> > > id
> > > > 1 for topics [Set(secureTopic)] from broker
> > > > [id:0,host:localhost,port:9092,secure:true] failed
> > > > (kafka.client.ClientUtils$)
> > > >
> > > > 2014-07-18 13:12:46,282] ERROR Failed to send requests for topics
> > > > secureTopic with correlation ids in [0,8]
> > > > (kafka.producer.async.DefaultEventHandler)
> > > >
> > > > [2014-07-18 13:12:46,283] ERROR Error in handling batch of 1 events
> > > > (kafka.producer.async.ProducerSendThread)
> > > >
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > >
> > > > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> > > >
> > > > at
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > > >
> > > > at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > > >
> > > >
> > > >
> > > > On Fri, Jul 18, 2014 at 11:56 AM, Joe Stein 
> > > wrote:
> > > >
> > > >> Hi Pramod,
> > > >>
> > > >> Can you increase KAFKA_HEAP_OPTS to lets say -Xmx1G in the
> > > >> kafka-console-producer.sh to see if that gets you further along
> please
> > > in
> > > >> your testing?
> > > >>
> > > >> Thanks!
> > > >>
> > > >> /***
> > > >>  Joe Stein
> > > >>  Founder, Principal Consultant
> > > >>  Big Data Open Source Security LLC
> > > >>  http://www.stealth.ly
> > > >>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > >> /
> > > >>
> > > >>
> > > >> On Fri, Jul 18, 2014 at 10:24 AM, Pramod Deshmukh <
> dpram...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >> > Hello Raja/Joe,
> > > >> > When I turn on security, i still get out of memory error on
> > producer.
> > > Is
> > > >> > this something to do with keys? Is there any other way I can
> connect
> > > to
> > > >> > broker?
> > > >> >
> > > >> > *producer log*
> > > >> > [2014-07-17 15:38:14,186] ERROR OOME with size 352518400
> > > (kafka.network.
> > > >> > BoundedByteBufferReceive)
> > > >> > java.lang.OutOfMemoryError: Java heap space
> > > >> >
> > > >> > *broker log*
> > > >> >
> > > >> > INFO begin ssl handshake for localhost/
> > > 127.0.0.1:50199//127.0.0.1:9092
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > >

Re: KAFKA-1477 (authentication layer) and 0.8.2

2014-07-25 Thread Rajasekar Elango
Yes we are very much interested in getting this code merged to trunk. I can
also do testing once it's available on trunk.

Thanks,
Raja.


On Fri, Jul 25, 2014 at 12:11 PM, Joe Stein  wrote:

> Hi Chris, glad to hear that even more folks are going to (want to) use the
> feature.  I didn't author the patch (Raja and Ivan did) and created the
> fork so folks could test it without much fuss.
>
> I just commented on the ticket to address Jun's last comment and think it
> also answers your question too.
>
> I know folks are using this now and other folks are looking to use it out
> of the core project.
>
> As long as it has a way to cause no harm when it is off I believe it really
> adds to the value Kafka brings to a number of organizations that can't use
> Kafka just because of this one thing.
>
> I am looking forward to being able to commit it to trunk.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>
> On Fri, Jul 25, 2014 at 11:34 AM, Chris Neal  wrote:
>
> > Hi guys,
> >
> > This JIRA (https://issues.apache.org/jira/browse/KAFKA-1477) leads me to
> > believe that an authentication layer implementation is planned as part of
> > the 0.8.2 release.  I was wondering if this is still the case?
> >
> > There was an earlier thread talking about security, but there hasn't been
> > activity on it in awhile.
> >
> > I grabbed Joe's fork and it works, but I was wondering about it getting
> > merged back into the official 0.8.2 codebase, or is this more likely
> > something that will be in 0.9?
> >
> > Thanks!
> >
>



-- 
Thanks,
Raja.


Re: [DISCUSS] Kafka Security Specific Features

2014-07-17 Thread Rajasekar Elango
14-07-17 15:37:17,137] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51696//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
> [2014-07-17 15:37:17,342] INFO begin ssl handshake for
> 10.1.100.130/10.1.100.130:51697//10.1.100.130:9092
> (kafka.network.security.SSLSocketChannel)
>
>
> *Start producer*
>
> *bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true
> --topic
> secure.test*
>
>
> *producer.log:*
>
> bin/kafka-console-producer.sh --broker-list 10.1.100.130:9092:true --topic
> secure.test
>
> [2014-07-17 15:37:46,889] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
>
> Hello Secure Kafka
>
> *[2014-07-17 15:38:14,186] ERROR OOME with size 352518400
> (kafka.network.BoundedByteBufferReceive)*
>
> *java.lang.OutOfMemoryError: Java heap space*
>
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>
> at
>
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:102)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:79)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:76)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
>
> at kafka.utils.Utils$.swallow(Utils.scala:172)
>
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
>
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
>
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>
> at scala.collection.immutable.Stream.foreach(Stream.scala:526)
>
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>
> at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
>
>
> On Wed, Jul 16, 2014 at 6:07 PM, Rajasekar Elango 
> wrote:
>
> > Pramod,
> >
> >
> > I presented secure kafka configuration and usage at last meet up. So hope
> > this
> > video recording <http://www.ustream.tv/recorded/48396701>would help. You
> > can skip to about 59 min to jump to security talk.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh 
> > wrote:
> >
> > > Hello Joe,
> > >
> > > Is there a configuration or example to test Kafka security piece?
> > >
> > > Thanks,
> > >
> > > Pramod
> > >
> > >
> > > On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh 
> > > wrote:
> > >
> > > > Thanks Joe,
> > > >
> > > > This branch works. I was able to proceed. I still had to set scala
> > > version
> > > > to 2.9.2 in kafka-run-class.sh.
> > > >
> > > >
> > > >
> > > > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein 
> > wrote:
> > > >
> > > >> That is a very old branch.
> > > >>
> > > >> Here is a more up to date one
> > > >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to
> > be
> > > >> updated to latest trunk might have a chance to-do that next week).
> > > >>
> > > >> You should be using gradle now as per the README.
> > > >>
> > > >> /***
> > > >>  Joe Stein
> > > >>  Founder, Principal Consultant
> > > >>  Big Data Open Source Security LLC
> > > >>  http://www.stealth.ly
> > > >>  Twitter: @allthingshadoop <http://www.twitte

Re: [DISCUSS] Kafka Security Specific Features

2014-07-16 Thread Rajasekar Elango
Pramod,


I presented secure kafka configuration and usage at last meet up. So hope this
video recording would help. You
can skip to about 59 min to jump to security talk.

Thanks,
Raja.


On Wed, Jul 16, 2014 at 5:57 PM, Pramod Deshmukh  wrote:

> Hello Joe,
>
> Is there a configuration or example to test Kafka security piece?
>
> Thanks,
>
> Pramod
>
>
> On Wed, Jul 16, 2014 at 5:20 PM, Pramod Deshmukh 
> wrote:
>
> > Thanks Joe,
> >
> > This branch works. I was able to proceed. I still had to set scala
> version
> > to 2.9.2 in kafka-run-class.sh.
> >
> >
> >
> > On Wed, Jul 16, 2014 at 3:57 PM, Joe Stein  wrote:
> >
> >> That is a very old branch.
> >>
> >> Here is a more up to date one
> >> https://github.com/stealthly/kafka/tree/v0.8.2_KAFKA-1477 (needs to be
> >> updated to latest trunk might have a chance to-do that next week).
> >>
> >> You should be using gradle now as per the README.
> >>
> >> /***
> >>  Joe Stein
> >>  Founder, Principal Consultant
> >>  Big Data Open Source Security LLC
> >>  http://www.stealth.ly
> >>  Twitter: @allthingshadoop 
> >> /
> >>
> >>
> >> On Wed, Jul 16, 2014 at 3:49 PM, Pramod Deshmukh 
> >> wrote:
> >>
> >> > Thanks Joe for this,
> >> >
> >> > I cloned this branch and tried to run zookeeper but I get
> >> >
> >> > Error: Could not find or load main class
> >> > org.apache.zookeeper.server.quorum.QuorumPeerMain
> >> >
> >> >
> >> > I see scala version is still set to 2.8.0
> >> >
> >> > if [ -z "$SCALA_VERSION" ]; then
> >> >
> >> > SCALA_VERSION=2.8.0
> >> >
> >> > fi
> >> >
> >> >
> >> >
> >> > Then I installed sbt and scala and followed your instructions for
> >> different
> >> > scala versions. I was able to bring zookeeper up but brokers fail to
> >> start
> >> > with error
> >> >
> >> > Error: Could not find or load main class kafka.Kafka
> >> >
> >> > I think I am doing something wrong. Can you please help me?
> >> >
> >> > Our current production setup is with 2.8.0 and want to stick to it.
> >> >
> >> > Thanks,
> >> >
> >> > Pramod
> >> >
> >> >
> >> > On Tue, Jun 3, 2014 at 3:57 PM, Joe Stein 
> wrote:
> >> >
> >> > > Hi,I wanted to re-ignite the discussion around Apache Kafka
> Security.
> >> >  This
> >> > > is a huge bottleneck (non-starter in some cases) for a lot of
> >> > organizations
> >> > > (due to regulatory, compliance and other requirements). Below are my
> >> > > suggestions for specific changes in Kafka to accommodate security
> >> > > requirements.  This comes from what folks are doing "in the wild" to
> >> > > workaround and implement security with Kafka as it is today and also
> >> > what I
> >> > > have discovered from organizations about their blockers. It also
> >> picks up
> >> > > from the wiki (which I should have time to update later in the week
> >> based
> >> > > on the below and feedback from the thread).
> >> > >
> >> > > 1) Transport Layer Security (i.e. SSL)
> >> > >
> >> > > This also includes client authentication in addition to in-transit
> >> > security
> >> > > layer.  This work has been picked up here
> >> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate
> >> any
> >> > > thoughts, comments, feedback, tomatoes, whatever for this patch.  It
> >> is a
> >> > > pickup from the fork of the work first done here
> >> > > https://github.com/relango/kafka/tree/kafka_security.
> >> > >
> >> > > 2) Data encryption at rest.
> >> > >
> >> > > This is very important and something that can be facilitated within
> >> the
> >> > > wire protocol. It requires an additional map data structure for the
> >> > > "encrypted [data encryption key]". With this map (either in your
> >> object
> >> > or
> >> > > in the wire protocol) you can store the dynamically generated
> >> symmetric
> >> > key
> >> > > (for each message) and then encrypt the data using that dynamically
> >> > > generated key.  You then encrypt the encryption key using each
> public
> >> key
> >> > > for whom is expected to be able to decrypt the encryption key to
> then
> >> > > decrypt the message.  For each public key encrypted symmetric key
> >> (which
> >> > is
> >> > > now the "encrypted [data encryption key]" along with which public
> key
> >> it
> >> > > was encrypted with for (so a map of [publicKey] =
> >> > > encryptedDataEncryptionKey) as a chain.   Other patterns can be
> >> > implemented
> >> > > but this is a pretty standard digital enveloping [0] pattern with
> >> only 1
> >> > > field added. Other patterns should be able to use that field to-do
> >> their
> >> > > implementation too.
> >> > >
> >> > > 3) Non-repudiation and long term non-repudiation.
> >> > >
> >> > > Non-repudiation is proving data hasn't changed.  This is often (if
> not
> >> > > always) done with x509 public certificates (chained to a certificate
> >> > > authority).
> >> > >
> >> > > Long term no

Re: [DISCUSS] Kafka Security Specific Features

2014-06-05 Thread Rajasekar Elango
Hi Jay,

Thanks for putting together a spec for security.

Joe,

Looks "Securing zookeeper.." part has been deleted from assumptions
section. communication with zookeeper need to be secured as well to make
entire kafka cluster secure. It may or may not require changes to kafka.
But it's good to have it in spec.

I could not find a link to edit the page after login into wiki. Do I need
any special permission to make edits?

Thanks,
Raja.


On Wed, Jun 4, 2014 at 8:57 PM, Joe Stein  wrote:

> I like the idea of working on the spec and prioritizing. I will update the
> wiki.
>
> - Joestein
>
>
> On Wed, Jun 4, 2014 at 1:11 PM, Jay Kreps  wrote:
>
> > Hey Joe,
> >
> > Thanks for kicking this discussion off! I totally agree that for
> something
> > that acts as a central message broker security is critical feature. I
> think
> > a number of people have been interested in this topic and several people
> > have put effort into special purpose security efforts.
> >
> > Since most the LinkedIn folks are working on the consumer right now I
> think
> > this would be a great project for any other interested people to take on.
> > There are some challenges in doing these things distributed but it can
> also
> > be a lot of fun.
> >
> > I think a good first step would be to get a written plan we can all agree
> > on for how things should work. Then we can break things down into chunks
> > that can be done independently while still aiming at a good end state.
> >
> > I had tried to write up some notes that summarized at least the thoughts
> I
> > had had on security:
> > https://cwiki.apache.org/confluence/display/KAFKA/Security
> >
> > What do you think of that?
> >
> > One assumption I had (which may be incorrect) is that although we want
> all
> > the things in your list, the two most pressing would be authentication
> and
> > authorization, and that was all that write up covered. You have more
> > experience in this domain, so I wonder how you would prioritize?
> >
> > Those notes are really sketchy, so I think the first goal I would have
> > would be to get to a real spec we can all agree on and discuss. A lot of
> > the security stuff has a high human interaction element and needs to work
> > in pretty different domains and different companies so getting this kind
> of
> > review is important.
> >
> > -Jay
> >
> >
> > On Tue, Jun 3, 2014 at 12:57 PM, Joe Stein  wrote:
> >
> > > Hi,I wanted to re-ignite the discussion around Apache Kafka Security.
> >  This
> > > is a huge bottleneck (non-starter in some cases) for a lot of
> > organizations
> > > (due to regulatory, compliance and other requirements). Below are my
> > > suggestions for specific changes in Kafka to accommodate security
> > > requirements.  This comes from what folks are doing "in the wild" to
> > > workaround and implement security with Kafka as it is today and also
> > what I
> > > have discovered from organizations about their blockers. It also picks
> up
> > > from the wiki (which I should have time to update later in the week
> based
> > > on the below and feedback from the thread).
> > >
> > > 1) Transport Layer Security (i.e. SSL)
> > >
> > > This also includes client authentication in addition to in-transit
> > security
> > > layer.  This work has been picked up here
> > > https://issues.apache.org/jira/browse/KAFKA-1477 and do appreciate any
> > > thoughts, comments, feedback, tomatoes, whatever for this patch.  It
> is a
> > > pickup from the fork of the work first done here
> > > https://github.com/relango/kafka/tree/kafka_security.
> > >
> > > 2) Data encryption at rest.
> > >
> > > This is very important and something that can be facilitated within the
> > > wire protocol. It requires an additional map data structure for the
> > > "encrypted [data encryption key]". With this map (either in your object
> > or
> > > in the wire protocol) you can store the dynamically generated symmetric
> > key
> > > (for each message) and then encrypt the data using that dynamically
> > > generated key.  You then encrypt the encryption key using each public
> key
> > > for whom is expected to be able to decrypt the encryption key to then
> > > decrypt the message.  For each public key encrypted symmetric key
> (which
> > is
> > > now the "encrypted [data encryption key]" along with which public key
> it
> > > was encrypted with for (so a map of [publicKey] =
> > > encryptedDataEncryptionKey) as a chain.   Other patterns can be
> > implemented
> > > but this is a pretty standard digital enveloping [0] pattern with only
> 1
> > > field added. Other patterns should be able to use that field to-do
> their
> > > implementation too.
> > >
> > > 3) Non-repudiation and long term non-repudiation.
> > >
> > > Non-repudiation is proving data hasn't changed.  This is often (if not
> > > always) done with x509 public certificates (chained to a certificate
> > > authority).
> > >
> > > Long term non-repudiation is what happens when the certificates of the
> > > certifi

Re: Spring integration?

2014-04-07 Thread Rajasekar Elango
Hi Michael,

We are using spring integration
kafkain
production and have been working fine. We also contributed some
features
(for eg: support for topic filter )
 to back to project. The outbound adapter has a poller to make them work
with enterprise integration patten Message Channels.  If you need more
detailed answer you could try posting your question in stack overflow.

I have also seen camel component for
kafka,
but havent' tried it.

Thanks,
Raja.


On Mon, Apr 7, 2014 at 10:26 PM, Michael Campbell <
michael.campb...@gmail.com> wrote:

> Hello,
>
> My company is looking at Kafka for a backbone to microservices, and we were
> wondering if anyone had actually done anything with it and Spring
> Integration (which we are looking at also for additional things).
>
> The reason I ask is the one place I can find any code seems about a half
> year out of date, is targeting both an old version of Kafka (0.8-beta) and
> an older version of Scala, and the example seems ... odd to me (he has a
> poller in the outbound adapter, which I don't understand at all).
>
> Is anyone actually using this combination?
>
> Or, is there a better way to integrate with Kafka if I want to put a layer
> between my business code and the Kafka API?(Camel, perhaps?)
>



-- 
Thanks,
Raja.


Re: Kafka and authentication

2014-03-31 Thread Rajasekar Elango
Hi Vijay,

We implemented mutual ssl authentication in kafka for our internal use and
we have plans to it contributed back to community.  But we implemented SSL over
older snapshot of version of kafka 0.8 release. We have been busy with
other projects and haven't got chance to merge our ssl changes to latest
version
of kafka. If you are interested in looking at the changes we made this, its
available in my github fork of apache kafka (
https://github.com/relango/kafka/tree/kafka_security)

Thanks,
Raja.


On Fri, Mar 28, 2014 at 10:06 PM, Neha Narkhede wrote:

> Hi Vijay,
>
> The document you pointed out has our initial thoughts on Kafka security.
> This work is still in design and discussion phase, no code has been written
> as such and we hope to pick it up in a couple months. However, if you have
> thoughts on how it should work and/or would like to contribute patches, we
> would be happy to collaborate with you.
>
> Thanks,
> Neha
>
>
> On Fri, Mar 28, 2014 at 4:05 PM, Vijay Ramachandran <
> vramachand...@apple.com
> > wrote:
>
> > Hi All,
> >
> > I was googling around for info on securing kafka. The best document I
> > could find was
> https://cwiki.apache.org/confluence/display/KAFKA/Security,
> > which is "kind of old". It is not clear if any steps were taken after
> this
> > doc was put together. Looking at the features / bug fixes in kafka also
> > does not paint a clear picture. Hence this set of questions :
> >
> > Is there a way to make kafka authenticate a producer sending messages /
> > consumer reading messages ?
> > Is there a way to make kafka authenticate itself to the ZooKeeper
> ensemble
> > ?
> >
> > Any info will be deeply appreciated
> >
> > Thanks
> >
> > Vijay
>



-- 
Thanks,
Raja.


Re: Spring Integration Kafka support

2014-01-13 Thread Rajasekar Elango
Hi Preetham,

We are able to successfully use spring-integration-kafka with non-zero
broker ids without any issues. Could you provide more details on what
exactly problem/error you are getting.?

Also, You  can try this examples under samples

to
get started quickly.

Thanks,
Raja.



On Fri, Jan 10, 2014 at 4:08 PM, Premchandra, Preetham Kukillaya <
preethampremchan...@fico.com> wrote:

> Hi,
> I was doing a poc using
> https://github.com/SpringSource/spring-integration-extensions/tree/master/spring-integration-kafka.
> I figured that code is expecting the brokerid=0 and ideally this will not
> be the case if multiple brokers are connecting to the same zookeeper.
>
> Regards
> Preetham
>
> This email and any files transmitted with it are confidential, proprietary
> and intended solely for the individual or entity to whom they are
> addressed. If you have received this email in error please delete it
> immediately.
>



-- 
Thanks,
Raja.


Re: Producer SSL?

2013-11-15 Thread Rajasekar Elango
Hi Jonathan

We forked kafka to add SSL feature. It not part of kafka official release

Sent from my iPhone

On Nov 15, 2013, at 12:32 PM, Jonathan Hodges  wrote:

> Hi,
>
> While searching the user group messages I found the following thread -
> http://grokbase.com/t/kafka/users/138vqq1x07/getting-leadernotavailableexception-in-console-producer-after-increasing-partitions-from-4-to-16.
> It shows the following stack trace with 0.8.
>
> [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
> id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
> topic(s) Set(test-41) (kafka.client.ClientUtils$)
> [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
> 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
> [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
> 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
> [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
> producing (kafka.producer.SyncProducer)
> [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
> (kafka.producer.SyncProducer)
> [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
> (kafka.producer.SyncProducerConfig)
>
> Is there a 'secure' boolean property on the broker that allows for SSL?  I
> didn't see it on http://kafka.apache.org/08/configuration.html but maybe I
> missed it?
>
> Thanks,
> Jonathan


Mirrormaker consumer looping to offset out of range and reset offset errors

2013-10-09 Thread Rajasekar Elango
We are seeing that mirrormaker consumer started looping through offset out
of range and reset offset errors for some of partitions (2 out of 8
partitions). The consumerOffsetChecker reported very high Lag for these 2
partitions. Looks like this problem has started after a consumer rebalance.
Here is log lines:

2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2526006629 for partition [FunnelProto,1] out of range; reset
offset to 2526006629
2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2363213504 for partition [FunnelProto,3] out of range; reset
offset to 2363213504
2013-10-06 06:09:59,993
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2146256007 for partition [jmx,0] out of range; reset offset
to 2146256007
2013-10-06 06:09:59,992
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2239688 for partition [tower_timing_metrics,3] out of range;
reset offset to 2239688
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 1234239 for partition [agent,0] out of range; reset offset
to 1234239
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2526006629 for partition [FunnelProto,1] out of range; reset
offset to 2526006629
2013-10-06 06:09:59,889
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4]
WARN  (kafka.consumer.ConsumerFetcherThread)  -
[ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-1-sjl.ops.sfdc.net-1380036300408-baa80a5a-0-4],
current offset 2363213504 for partition [FunnelProto,3] out of range; reset
offset to 2363213504


Also, as you can it's resetting offset to same value so it's looping
through this offset resets again and again. After we restarted our
mirrormaker process, it started consuming from beginning topic for all
partitions (we started received messages 7 days ) and it caught in couple
of hours..

We have couple of questions

1) What might have caused this to end up in this bad state..?
2) We had offset out of range problem only for 2 out of 8 partitions, but
it started to consume from beginning for all partitions in topic after we
restarted mirrormaker.. How problem with 2 partitions affected all other
partitions ..?


-- 
Thanks,
Raja.


Re: JMX connections timing out in EC2 in Kafka 0.8

2013-10-03 Thread Rajasekar Elango
May be this helps.

See last post on http://qnalist.com/questions/4522179/jmx

Thanks,
Raja.


On Thu, Oct 3, 2013 at 11:17 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> I don't think this issue is related to Kafka at all but I just wanted to
> try my luck in this user mailing list just in case people have run into
> similar issues. I am trying to enable JMX monitoring in Kafka running on
> EC2 by setting the JMX_PORT variable to 1099. Once Kafka is done booting, I
> am able to monitor Kafka from a client running on the same instance,
> however the JMS client gives java.net.ConnectException: Connection timed
> out exception when attempting to connect from another instance (also an EC2
> instance in the same VPC). I have checked the firewall rules and the port
> 1099 is enabled. Has anyone else ran into similar issues? Suggestions
> please.
>



-- 
Thanks,
Raja.


Re: Metadata API returns localhost.localdomain for one of the brokers in EC2

2013-10-03 Thread Rajasekar Elango
Hi Aniket,

We had same issue it turns out that we need to make sure ip to hostname
mapping should be correctly configured in /etc/hosts file.

For eg: If you had something like
127.0.0.1   localhost   localhost

as first line in /etc/hosts file, you will get his error. To fix we need to
add correct ip to hostname mapping as first line in /etc/hosts file.

Hope this helps.

Thanks,
Raja.



On Thu, Oct 3, 2013 at 11:02 AM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks Jun and David.
>
> I think the FAQ mentions why it's not possible to connect to broker from
> outside. In my case, all servers (producers and brokers) are in the same
> VPC. Call to InetAddress.getLocalHost.getHostAddress should return an
> internal IP to which producers should be able to connect. The issue seems
> to be that the call to InetAddress.getLocalHost.getHostAddress returns
> localhost.localdomain
> and that too on just 1 broker. Any ideas on why that could be happening?
>
> I can configure host.name property in broker config but its slightly
> painful. I am curious to know what can cause
> InetAddress.getLocalHost.getHostAddress
> to return loopback addresses like localhost.localdomain so that other users
> know why they really have to setup host.name in EC2 even if both producers
> and consumers are in same VPC.
>
>
> On 3 October 2013 19:56, Jun Rao  wrote:
>
> > There is an FAQ too.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-OnEC2%2Cwhycan%27tmyhighlevelconsumersconnecttothebrokers%3F
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Oct 3, 2013 at 6:43 AM, David Arthur  wrote:
> >
> > > You can configure the hostname for the broker with the "host.name"
> > > property in the broker's config (server.properties?). If you don't
> > specify
> > > one here, then all interfaces will be bound to and one will be chosen
> to
> > > get published via ZooKeeper (what the metadata API is reading)
> > >
> > > See: http://kafka.apache.org/**documentation.html#**brokerconfigs<
> > http://kafka.apache.org/documentation.html#brokerconfigs>
> > >
> > > -David
> > >
> > >
> > > On 10/3/13 2:57 AM, Aniket Bhatnagar wrote:
> > >
> > >> I have installed 2 brokers on EC2. I also have a (scala) application
> > that
> > >> receives data stream and pushes to kafka cluster. By co-incidence, a
> > >> (slightly heavier) EC2 instance is running both a kafka broker and the
> > >> data
> > >> receiver application. I am noticing that all data receiver application
> > >> nodes that are not on the shared kafka + reciever app EC2 instance are
> > >> complaining for connect errors to localhost.localdomain:9092. Is this
> a
> > >> possible bug that results in Kafka detecting instance hostname
> > >> as localhost.localdomain instead of actual hostname?
> > >>
> > >> Also, how do I fix this temporarily until a permanent fix is
> available?
> > >>
> > >>
> > >
> >
>



-- 
Thanks,
Raja.


Re: Kafka consumer - Mbean for max lag

2013-09-24 Thread Rajasekar Elango
Thanks Neha, Looks like this mbean was added recently. The version we are
running is from early June and it doesn't have this Mbean.

Thanks,
Raja.


On Mon, Sep 23, 2013 at 9:15 PM, Neha Narkhede wrote:

> On the consumer side, look for
> "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager".
> Updated the website to reflect that.
>
> Thanks,
> Neha
>
>
> On Mon, Sep 23, 2013 at 12:48 PM, Rajasekar Elango
> wrote:
>
> > In kafka documentation for
> > monitoring<http://kafka.apache.org/documentation.html#operations>.
> >  I see "we should be looking at max in messages among all partitions.".
> All
> > I can see is mbeans
> > *kafka.server".FetcherLagMetrics.*ConsumerFetcherThread* and it's value
> is
> > pretty much 0. Is this the correct Mbean ? If not, Can you tell me which
> > MBean provides that info?
> >
> > Thanks in advance.
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Kafka consumer - Mbean for max lag

2013-09-23 Thread Rajasekar Elango
In kafka documentation for
monitoring.
 I see "we should be looking at max in messages among all partitions.". All
I can see is mbeans
*kafka.server".FetcherLagMetrics.*ConsumerFetcherThread* and it's value is
pretty much 0. Is this the correct Mbean ? If not, Can you tell me which
MBean provides that info?

Thanks in advance.
-- 
Thanks,
Raja.


Re: Leader doesn't get assigned for new topics

2013-09-18 Thread Rajasekar Elango
>From the output of StateChangeLogMerger tool, I see only this error
repeated;

[2013-09-18 14:16:48,358] ERROR [KafkaApi-1] Error while fetching metadata
for partition [FunnelProto,0] (kafka.server.KafkaApis)

On the state-change.log itself, I see this error:

[2013-09-18 14:22:48,954] ERROR Conditional update of path
/brokers/topics/test-1379439240191/partitions/2/state with data {
"controller_epoch":10, "isr":[ 1, 5, 4 ], "leader":1, "leader_epoch":4,
"version":1 } and expected version 8 fai
led due to org.apache.zookeeper.KeeperException$BadVersionException:
KeeperErrorCode = BadVersion for
/brokers/topics/test-1379439240191/partitions/2/state (kafka.utils.ZkUtils$)

Do you know reason for above error..? Also this problem seem to be
intermittent, it started working now without any changes. I will continue
to monitor.

Thanks,
Raja.


On Tue, Sep 17, 2013 at 7:59 PM, Neha Narkhede wrote:

> Raja,
>
> Could you run the StateChangeLogMerger tool and give it one topic-partition
> that has the above mentioned problem. This tool is documented here -
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-7.StateChangeLogMergerTool
> .
>
> Let me know if you run into any issues while using it.
>
> Thanks,
> Neha
>
>
> On Tue, Sep 17, 2013 at 12:27 PM, Rajasekar Elango
> wrote:
>
> > Neha/Jun,
> >
> > The same problem started happening again although now our zookeeper
> cluster
> > is configured correctly. The produce always failed with
> > LeaderNotAvailableException and list topics shows topic is created with
> > leader "none". In the controller and stage-change log, I am seeing lot of
> > these failures..
> >
> >
> > [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
> > correlation id 622369865 from client  on partition [FunnelProto,6] failed
> > due to Partition [FunnelProto,6] doesn't exist on 2
> > (kafka.server.KafkaApis)
> > [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
> > correlation id 622369865 from client  on partition [internal_metrics,3]
> > failed due to Partition [internal_metrics,3] doesn't exist on 2
> > (kafka.server.KafkaApis)
> > [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
> > correlation id 622369865 from client  on partition [FunnelProto,0] failed
> > due to Partition [FunnelProto,0] doesn't exist on 2
> > (kafka.server.KafkaApis)
> > [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
> > correlation id 622369865 from client  on partition [jmx,3] failed due to
> > Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis)
> > [2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
> > correlation id 622369865 from client  on partition [FunnelProto,5] failed
> > due to Partition [FunnelProto,5] doesn't exist on 2
> > (kafka.server.KafkaApis)
> >
> >
> > When I ran listTopics command for one of above topic, all partitions are
> > under replicated (we have replication factor set to 3). Any clues on what
> > could be issue and how can we get it back to working?
> >
> > Thanks,
> > Raja.
> >
> >
> >
> > On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede  > >wrote:
> >
> > > Ah ok. Thanks for sharing that.
> > >
> > >
> > >
> > > On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > We have 3 zookeeper node in the cluster with a hardware load
> balancer .
> > >  In
> > > > one of the zookeeper, we did not configure ensemble correctly
> (server.n
> > > > property in zoo.cfg) . So it ended up as like 2 nodes in one cluster,
> > one
> > > > node in other cluster. The load balancer is randomly hitting one of 2
> > > > zookeepers in two different cluster.
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > > >wrote:
> > > >
> > > > > Just curious to know, what was the misconfiguration?
> > > > >
> > > > >
> > > > > On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
> > > > > wrote:
> > > > >
> > > > > > Thanks Neha and Jun, It turned out to be miss configuration in
> our
> > > > > > zookeeper cluster. After correcting it everything looks good.
> > > > &g

Re: Leader doesn't get assigned for new topics

2013-09-17 Thread Rajasekar Elango
Neha/Jun,

The same problem started happening again although now our zookeeper cluster
is configured correctly. The produce always failed with
LeaderNotAvailableException and list topics shows topic is created with
leader "none". In the controller and stage-change log, I am seeing lot of
these failures..


[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,6] failed
due to Partition [FunnelProto,6] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [internal_metrics,3]
failed due to Partition [internal_metrics,3] doesn't exist on 2
(kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,0] failed
due to Partition [FunnelProto,0] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [jmx,3] failed due to
Partition [jmx,3] doesn't exist on 2 (kafka.server.KafkaApis)
[2013-09-17 19:21:36,531] WARN [KafkaApi-2] Produce request with
correlation id 622369865 from client  on partition [FunnelProto,5] failed
due to Partition [FunnelProto,5] doesn't exist on 2 (kafka.server.KafkaApis)


When I ran listTopics command for one of above topic, all partitions are
under replicated (we have replication factor set to 3). Any clues on what
could be issue and how can we get it back to working?

Thanks,
Raja.



On Fri, Sep 13, 2013 at 6:26 PM, Neha Narkhede wrote:

> Ah ok. Thanks for sharing that.
>
>
>
> On Fri, Sep 13, 2013 at 2:50 PM, Rajasekar Elango  >wrote:
>
> > We have 3 zookeeper node in the cluster with a hardware load balancer .
>  In
> > one of the zookeeper, we did not configure ensemble correctly (server.n
> > property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one
> > node in other cluster. The load balancer is randomly hitting one of 2
> > zookeepers in two different cluster.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede  > >wrote:
> >
> > > Just curious to know, what was the misconfiguration?
> > >
> > >
> > > On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
> > > wrote:
> > >
> > > > Thanks Neha and Jun, It turned out to be miss configuration in our
> > > > zookeeper cluster. After correcting it everything looks good.
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao  wrote:
> > > >
> > > > > Any error in the controller and the state-change log? Are brokers
> > 2,3,4
> > > > > alive?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango <
> > > > rela...@salesforce.com
> > > > > >wrote:
> > > > >
> > > > > > We are seeing a problem that we we try to send messages to new
> > topic
> > > it
> > > > > > fails kafka.common.LeaderNotAvailableException. But usually this
> > > > problem
> > > > > > will be transient and if we re-send messages to same topic will
> > work.
> > > > But
> > > > > > now we tried rending message to same topic several time, but
> still
> > > > fails
> > > > > > with same error:
> > > > > >
> > > > > > In the server log I see ] Auto creation of topic test-sjl2 with 8
> > > > > > partitions and replication factor 3 is successful!. But
> listTopics
> > > > > command
> > > > > > shows leader "none" like below:
> > > > > >
> > > > > > topic: test-sjl2partition: 0leader: nonereplicas:
> > > 2,4,3
> > > > > > isr:
> > > > > > topic: test-sjl2partition: 1leader: nonereplicas:
> > > 3,2,4
> > > > > > isr:
> > > > > > topic: test-sjl2partition: 2leader: nonereplicas:
> > > 4,3,2
> > > > > > isr:
> > > > > > topic: test-sjl2partition: 3leader: nonereplicas:
> > > 2,3,4
> > > > > > isr:
> >

Re: Leader doesn't get assigned for new topics

2013-09-13 Thread Rajasekar Elango
We have 3 zookeeper node in the cluster with a hardware load balancer .  In
one of the zookeeper, we did not configure ensemble correctly (server.n
property in zoo.cfg) . So it ended up as like 2 nodes in one cluster, one
node in other cluster. The load balancer is randomly hitting one of 2
zookeepers in two different cluster.

Thanks,
Raja.


On Fri, Sep 13, 2013 at 1:04 PM, Neha Narkhede wrote:

> Just curious to know, what was the misconfiguration?
>
>
> On Fri, Sep 13, 2013 at 10:02 AM, Rajasekar Elango
> wrote:
>
> > Thanks Neha and Jun, It turned out to be miss configuration in our
> > zookeeper cluster. After correcting it everything looks good.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao  wrote:
> >
> > > Any error in the controller and the state-change log? Are brokers 2,3,4
> > > alive?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > We are seeing a problem that we we try to send messages to new topic
> it
> > > > fails kafka.common.LeaderNotAvailableException. But usually this
> > problem
> > > > will be transient and if we re-send messages to same topic will work.
> > But
> > > > now we tried rending message to same topic several time, but still
> > fails
> > > > with same error:
> > > >
> > > > In the server log I see ] Auto creation of topic test-sjl2 with 8
> > > > partitions and replication factor 3 is successful!. But listTopics
> > > command
> > > > shows leader "none" like below:
> > > >
> > > > topic: test-sjl2partition: 0leader: nonereplicas:
> 2,4,3
> > > > isr:
> > > > topic: test-sjl2partition: 1leader: nonereplicas:
> 3,2,4
> > > > isr:
> > > > topic: test-sjl2partition: 2leader: nonereplicas:
> 4,3,2
> > > > isr:
> > > > topic: test-sjl2partition: 3leader: nonereplicas:
> 2,3,4
> > > > isr:
> > > > topic: test-sjl2partition: 4leader: nonereplicas:
> 3,4,2
> > > > isr:
> > > > topic: test-sjl2partition: 5leader: nonereplicas:
> 4,2,3
> > > > isr:
> > > > topic: test-sjl2partition: 6leader: nonereplicas:
> 2,4,3
> > > > isr:
> > > > topic: test-sjl2partition: 7leader: nonereplicas:
> 3,2,4
> > > > isr:
> > > >
> > > > I also see following NotLeaderForPatritionExcetion and
> > ZookeeperExcetion
> > > in
> > > > logs
> > > >
> > > > kafka.common.NotLeaderForPartitionException
> > > > at
> > sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
> > > > Source)
> > > > at
> > > >
> > > >
> > >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> > > > at
> > > java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> > > > at java.lang.Class.newInstance0(Class.java:355)
> > > > at java.lang.Class.newInstance(Class.java:308)
> > > > at
> > kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
> > > > at kafka.utils.Logging$class.warn(Logging.scala:88)
> > > > at
> > > kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
> > > > at
> > > >
> scala.collection.immutable.HashMap$HashMap1.f

Re: Leader doesn't get assigned for new topics

2013-09-13 Thread Rajasekar Elango
Thanks Neha and Jun, It turned out to be miss configuration in our
zookeeper cluster. After correcting it everything looks good.

Thanks,
Raja.


On Fri, Sep 13, 2013 at 10:13 AM, Jun Rao  wrote:

> Any error in the controller and the state-change log? Are brokers 2,3,4
> alive?
>
> Thanks,
>
> Jun
>
>
> On Thu, Sep 12, 2013 at 4:56 PM, Rajasekar Elango  >wrote:
>
> > We are seeing a problem that we we try to send messages to new topic it
> > fails kafka.common.LeaderNotAvailableException. But usually this problem
> > will be transient and if we re-send messages to same topic will work. But
> > now we tried rending message to same topic several time, but still fails
> > with same error:
> >
> > In the server log I see ] Auto creation of topic test-sjl2 with 8
> > partitions and replication factor 3 is successful!. But listTopics
> command
> > shows leader "none" like below:
> >
> > topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3
> > isr:
> > topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4
> > isr:
> > topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2
> > isr:
> > topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4
> > isr:
> > topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2
> > isr:
> > topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3
> > isr:
> > topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3
> > isr:
> > topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4
> > isr:
> >
> > I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion
> in
> > logs
> >
> > kafka.common.NotLeaderForPartitionException
> > at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
> > Source)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
> > at
> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
> > at java.lang.Class.newInstance0(Class.java:355)
> > at java.lang.Class.newInstance(Class.java:308)
> > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
> > at kafka.utils.Logging$class.warn(Logging.scala:88)
> > at
> kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
> > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
> > at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
> > (kafka.utils.ZkUtils$)  - Conditional update of path
> > /brokers/topics/FunnelProto/partitions/4/state with data {
> > "controller_epoch":3, "isr":[ 2, 5 ], "leader":2, "leader_epoch":2,
> > "version":1 } and expected version 14 failed due to
> > org.apache.zookeeper.KeeperException$BadVersionException:
> KeeperErrorCode =
> > BadVersion for /brokers/topics/FunnelProto/partitions/4/state
> > 2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
> > (kafka.utils.ZkUtils$)  - Conditional update of path
> > /brokers/topics/FunnelProto/partitions/4/state with data {
> > "controller_epoch":3, "isr":[ 2, 5 ], "leader":2, "leader_epoch":2,
> > "version":1 } and expected version 14 failed due to
> > org.apache.zookeeper.KeeperException$BadVersionException:
> KeeperErrorCode =
> > BadVersion for /brokers/topics/FunnelProto/partitions/4/state
> >
> >
> > Any clues on what could be problem.. ?
> >
> > Any for your help.
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Leader doesn't get assigned for new topics

2013-09-12 Thread Rajasekar Elango
We are seeing a problem that we we try to send messages to new topic it
fails kafka.common.LeaderNotAvailableException. But usually this problem
will be transient and if we re-send messages to same topic will work. But
now we tried rending message to same topic several time, but still fails
with same error:

In the server log I see ] Auto creation of topic test-sjl2 with 8
partitions and replication factor 3 is successful!. But listTopics command
shows leader "none" like below:

topic: test-sjl2partition: 0leader: nonereplicas: 2,4,3 isr:
topic: test-sjl2partition: 1leader: nonereplicas: 3,2,4 isr:
topic: test-sjl2partition: 2leader: nonereplicas: 4,3,2 isr:
topic: test-sjl2partition: 3leader: nonereplicas: 2,3,4 isr:
topic: test-sjl2partition: 4leader: nonereplicas: 3,4,2 isr:
topic: test-sjl2partition: 5leader: nonereplicas: 4,2,3 isr:
topic: test-sjl2partition: 6leader: nonereplicas: 2,4,3 isr:
topic: test-sjl2partition: 7leader: nonereplicas: 3,2,4 isr:

I also see following NotLeaderForPatritionExcetion and ZookeeperExcetion in
logs

kafka.common.NotLeaderForPartitionException
at sun.reflect.GeneratedConstructorAccessor19.newInstance(Unknown
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at java.lang.Class.newInstance0(Class.java:355)
at java.lang.Class.newInstance(Class.java:308)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4$$anonfun$apply$5.apply(AbstractFetcherThread.scala:158)
at kafka.utils.Logging$class.warn(Logging.scala:88)
at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:157)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
(kafka.utils.ZkUtils$)  - Conditional update of path
/brokers/topics/FunnelProto/partitions/4/state with data {
"controller_epoch":3, "isr":[ 2, 5 ], "leader":2, "leader_epoch":2,
"version":1 } and expected version 14 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /brokers/topics/FunnelProto/partitions/4/state
2013-09-12 23:54:10,838 [kafka-request-handler-2] ERROR
(kafka.utils.ZkUtils$)  - Conditional update of path
/brokers/topics/FunnelProto/partitions/4/state with data {
"controller_epoch":3, "isr":[ 2, 5 ], "leader":2, "leader_epoch":2,
"version":1 } and expected version 14 failed due to
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /brokers/topics/FunnelProto/partitions/4/state


Any clues on what could be problem.. ?

Any for your help.

-- 
Thanks,
Raja.


Re: Mirror maker doesn't replicate new topics

2013-09-11 Thread Rajasekar Elango
Thanks Guozhang, Yes we had set to 'largest' and changing it to 'smallest'
resolved the issue. So it was due to the jira
https://issues.apache.org/jira/browse/KAFKA-1006

Thanks,
Raja.


On Tue, Sep 10, 2013 at 1:18 PM, Guozhang Wang  wrote:

> Oh got it. Did you set auto.offset.reset = smallest or largest? If it is
> largest it could be due to this bug:
>
> https://issues.apache.org/jira/browse/KAFKA-1006
>
> Guozhang
>
>
>
> On Tue, Sep 10, 2013 at 10:09 AM, Rajasekar Elango
> wrote:
>
> > Hi Guozhang ,
> >
> > 1) When I say "I send messages to new topic" -> yes I am sending new
> > messages to source cluster via console producer.
> > 2) The log message "Handling 0 events" doesn't output topic name. But I
> > would believe its for both old and new topics, because no other app is
> > sending messages to source cluster other than me trying to test using
> > console producer.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Tue, Sep 10, 2013 at 1:03 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Raja,
> > >
> > > When you say "I send messages to new topic" I guess you mean that you
> > send
> > > messages to the source cluster right? It may be due to the fact that
> > > producers of mirror make have not catched up with the mirror maker
> > > consumer.
> > >
> > > When you say "I always see Handling 0 events" do you mean that you see
> > this
> > > for both messages for the new topic and for the old topics, or it only
> > > shows this log for new topic?
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Sep 10, 2013 at 7:47 AM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > Thanks Guozhang,
> > > >
> > > > 1, 2, 3 all are true. We are using default value 200 for
> > > batch.num.messages
> > > > and 5000ms queue.buffering.max.ms. I believe it should batch either
> if
> > > > batch.num.messages is reached or queue.buffering.max.ms is reached.
> > > >
> > > > I see log message "5000ms elapsed , Queue time reached. Sending.  "
>  on
> > > > regular interval. But when I send messages to new topic, I always see
> > > > "Handling 0 events" and it doesn't produce to target cluster. But
> when
> > I
> > > > resend it second time, I see "Handling x events" and starts
> producing.
> > > Any
> > > > clues on how to debug further?
> > > >
> > > > Thanks,
> > > >
> > > > Raja.
> > > >
> > > >
> > > > On Mon, Sep 9, 2013 at 6:02 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Raja,
> > > > >
> > > > > So just to summarize the scenario:
> > > > >
> > > > > 1) The consumer of mirror maker is successfully consuming all
> > > partitions
> > > > of
> > > > > the newly created topic.
> > > > > 2) The producer of mirror maker is not producing the new messages
> > > > > immediately when the topic is created (observed from
> > > ProducerSendThread's
> > > > > log).
> > > > > 3) The producer of mirror maker will start producing the new
> messages
> > > > when
> > > > > more messages are sent to the source cluster.
> > > > >
> > > > > If 1) is true then KAFKA-1030 is excluded, since the consumer
> > > > successfully
> > > > > recognize all the partitions and start consuming.
> > > > >
> > > > > If both 2) and 3) is true, I would wonder if the batch size of the
> > > mirror
> > > > > maker producer is large and hence will not send until enough
> messages
> > > are
> > > > > accumulated at the producer queue.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Mon, Sep 9, 2013 at 2:36 PM, Rajasekar Elango <
> > > rela...@salesforce.com
> > > > > >wrote:
> > > > >
> > > > > > yes, the data exists in source cluster, but not in target
> cluster.
> > I
> > > > > can't
> > > > > > replicate this problem in dev environment and it happens only in
> > prod
> > > > > > environment. I turne

Re: Mirror maker doesn't replicate new topics

2013-09-10 Thread Rajasekar Elango
Hi Guozhang ,

1) When I say "I send messages to new topic" -> yes I am sending new
messages to source cluster via console producer.
2) The log message "Handling 0 events" doesn't output topic name. But I
would believe its for both old and new topics, because no other app is
sending messages to source cluster other than me trying to test using
console producer.

Thanks,
Raja.


On Tue, Sep 10, 2013 at 1:03 PM, Guozhang Wang  wrote:

> Hi Raja,
>
> When you say "I send messages to new topic" I guess you mean that you send
> messages to the source cluster right? It may be due to the fact that
> producers of mirror make have not catched up with the mirror maker
> consumer.
>
> When you say "I always see Handling 0 events" do you mean that you see this
> for both messages for the new topic and for the old topics, or it only
> shows this log for new topic?
>
> Guozhang
>
>
> On Tue, Sep 10, 2013 at 7:47 AM, Rajasekar Elango  >wrote:
>
> > Thanks Guozhang,
> >
> > 1, 2, 3 all are true. We are using default value 200 for
> batch.num.messages
> > and 5000ms queue.buffering.max.ms. I believe it should batch either if
> > batch.num.messages is reached or queue.buffering.max.ms is reached.
> >
> > I see log message "5000ms elapsed , Queue time reached. Sending.  "  on
> > regular interval. But when I send messages to new topic, I always see
> > "Handling 0 events" and it doesn't produce to target cluster. But when I
> > resend it second time, I see "Handling x events" and starts producing.
> Any
> > clues on how to debug further?
> >
> > Thanks,
> >
> > Raja.
> >
> >
> > On Mon, Sep 9, 2013 at 6:02 PM, Guozhang Wang 
> wrote:
> >
> > > Hi Raja,
> > >
> > > So just to summarize the scenario:
> > >
> > > 1) The consumer of mirror maker is successfully consuming all
> partitions
> > of
> > > the newly created topic.
> > > 2) The producer of mirror maker is not producing the new messages
> > > immediately when the topic is created (observed from
> ProducerSendThread's
> > > log).
> > > 3) The producer of mirror maker will start producing the new messages
> > when
> > > more messages are sent to the source cluster.
> > >
> > > If 1) is true then KAFKA-1030 is excluded, since the consumer
> > successfully
> > > recognize all the partitions and start consuming.
> > >
> > > If both 2) and 3) is true, I would wonder if the batch size of the
> mirror
> > > maker producer is large and hence will not send until enough messages
> are
> > > accumulated at the producer queue.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Sep 9, 2013 at 2:36 PM, Rajasekar Elango <
> rela...@salesforce.com
> > > >wrote:
> > >
> > > > yes, the data exists in source cluster, but not in target cluster. I
> > > can't
> > > > replicate this problem in dev environment and it happens only in prod
> > > > environment. I turned on debug logging, but not able to identify  the
> > > > problem. Basically, whenever I send data to new topic, I don't see
> any
> > > log
> > > > messages from ProducerSendThread in mirrormaker log so they are not
> > > > produced to target cluster. If I send more messages to same topic,
> the
> > > > producer send thread kicks off and replicates the messages. But
> > whatever
> > > > messages send first time gets lost. How can I trouble shoot this
> > problem
> > > > further? Even this could be due to know issue
> > > > https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm
> > > that?
> > > > Is there config tweaking I can make to workaround this..?
> > > > ConsumerOffsetChecks helps to track consumers. Its there any other
> tool
> > > we
> > > > can use to track producers in mirrormaker. ?
> > > >
> > > > Thanks in advance for help.
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike 
> > > wrote:
> > > >
> > > > > Hi Rajasekar,
> > > > >
> > > > > You said that ConsumerOffsetChecker shows that new topics are
> > > > successfully
> > > > > consumed and the lag is 0. If that's the case, can you ver

Re: Mirror maker doesn't replicate new topics

2013-09-10 Thread Rajasekar Elango
Thanks Guozhang,

1, 2, 3 all are true. We are using default value 200 for batch.num.messages
and 5000ms queue.buffering.max.ms. I believe it should batch either if
batch.num.messages is reached or queue.buffering.max.ms is reached.

I see log message "5000ms elapsed , Queue time reached. Sending.  "  on
regular interval. But when I send messages to new topic, I always see
"Handling 0 events" and it doesn't produce to target cluster. But when I
resend it second time, I see "Handling x events" and starts producing. Any
clues on how to debug further?

Thanks,

Raja.


On Mon, Sep 9, 2013 at 6:02 PM, Guozhang Wang  wrote:

> Hi Raja,
>
> So just to summarize the scenario:
>
> 1) The consumer of mirror maker is successfully consuming all partitions of
> the newly created topic.
> 2) The producer of mirror maker is not producing the new messages
> immediately when the topic is created (observed from ProducerSendThread's
> log).
> 3) The producer of mirror maker will start producing the new messages when
> more messages are sent to the source cluster.
>
> If 1) is true then KAFKA-1030 is excluded, since the consumer successfully
> recognize all the partitions and start consuming.
>
> If both 2) and 3) is true, I would wonder if the batch size of the mirror
> maker producer is large and hence will not send until enough messages are
> accumulated at the producer queue.
>
> Guozhang
>
>
> On Mon, Sep 9, 2013 at 2:36 PM, Rajasekar Elango  >wrote:
>
> > yes, the data exists in source cluster, but not in target cluster. I
> can't
> > replicate this problem in dev environment and it happens only in prod
> > environment. I turned on debug logging, but not able to identify  the
> > problem. Basically, whenever I send data to new topic, I don't see any
> log
> > messages from ProducerSendThread in mirrormaker log so they are not
> > produced to target cluster. If I send more messages to same topic, the
> > producer send thread kicks off and replicates the messages. But whatever
> > messages send first time gets lost. How can I trouble shoot this problem
> > further? Even this could be due to know issue
> > https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm
> that?
> > Is there config tweaking I can make to workaround this..?
> > ConsumerOffsetChecks helps to track consumers. Its there any other tool
> we
> > can use to track producers in mirrormaker. ?
> >
> > Thanks in advance for help.
> >
> > Thanks,
> > Raja.
> >
> >
> >
> >
> > On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike 
> wrote:
> >
> > > Hi Rajasekar,
> > >
> > > You said that ConsumerOffsetChecker shows that new topics are
> > successfully
> > > consumed and the lag is 0. If that's the case, can you verify that
> there
> > > is data on the source cluster for these new topics? If there is no data
> > at
> > > the source, MirrorMaker will only assign consumer streams to the new
> > > topic, but the lag will be 0.
> > >
> > > This could otherwise be related to
> > > https://issues.apache.org/jira/browse/KAFKA-1030.
> > >
> > > Swapnil
> > >
> > >
> > >
> > > On 9/5/13 8:38 PM, "Guozhang Wang"  wrote:
> > >
> > > >Could you let me know the process of reproducing this issue?
> > > >
> > > >Guozhang
> > > >
> > > >
> > > >On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango
> > > >wrote:
> > > >
> > > >> Yes guozhang
> > > >>
> > > >> Sent from my iPhone
> > > >>
> > > >> On Sep 5, 2013, at 7:53 PM, Guozhang Wang 
> wrote:
> > > >>
> > > >> > Hi Rajasekar,
> > > >> >
> > > >> > Is auto.create.topics.enable set to true in your target cluster?
> > > >> >
> > > >> > Guozhang
> > > >> >
> > > >> >
> > > >> > On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango
> > > >> > > >> >wrote:
> > > >> >
> > > >> >> We having issues that mirormaker not longer replicate newly
> created
> > > >> topics.
> > > >> >> It continues to replicate data for existing topics and but new
> > topics
> > > >> >> doesn't get created on target cluster. ConsumerOffsetTracker
> shows
> > > >>that
> > > >> new
> > > >> >> topics are successfully consumed and Lag is 0. But those topics
> > > >>doesn't
> > > >> get
> > > >> >> created in target cluster. I also don't see mbeans for this new
> > topic
> > > >> under
> > > >> >> kafka.producer.ProducerTopicMetrics.metric. In logs I
> > see
> > > >> >> warning for NotLeaderForPatition. but don't see major error. What
> > > >>else
> > > >> can
> > > >> >> we look to troubleshoot this further.
> > > >> >>
> > > >> >> --
> > > >> >> Thanks,
> > > >> >> Raja.
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > >--
> > > >-- Guozhang
> > >
> > >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Raja.


Re: Mirror maker doesn't replicate new topics

2013-09-09 Thread Rajasekar Elango
yes, the data exists in source cluster, but not in target cluster. I can't
replicate this problem in dev environment and it happens only in prod
environment. I turned on debug logging, but not able to identify  the
problem. Basically, whenever I send data to new topic, I don't see any log
messages from ProducerSendThread in mirrormaker log so they are not
produced to target cluster. If I send more messages to same topic, the
producer send thread kicks off and replicates the messages. But whatever
messages send first time gets lost. How can I trouble shoot this problem
further? Even this could be due to know issue
https://issues.apache.org/jira/browse/KAFKA-1030, how can I confirm that?
Is there config tweaking I can make to workaround this..?
ConsumerOffsetChecks helps to track consumers. Its there any other tool we
can use to track producers in mirrormaker. ?

Thanks in advance for help.

Thanks,
Raja.




On Fri, Sep 6, 2013 at 3:50 AM, Swapnil Ghike  wrote:

> Hi Rajasekar,
>
> You said that ConsumerOffsetChecker shows that new topics are successfully
> consumed and the lag is 0. If that's the case, can you verify that there
> is data on the source cluster for these new topics? If there is no data at
> the source, MirrorMaker will only assign consumer streams to the new
> topic, but the lag will be 0.
>
> This could otherwise be related to
> https://issues.apache.org/jira/browse/KAFKA-1030.
>
> Swapnil
>
>
>
> On 9/5/13 8:38 PM, "Guozhang Wang"  wrote:
>
> >Could you let me know the process of reproducing this issue?
> >
> >Guozhang
> >
> >
> >On Thu, Sep 5, 2013 at 5:04 PM, Rajasekar Elango
> >wrote:
> >
> >> Yes guozhang
> >>
> >> Sent from my iPhone
> >>
> >> On Sep 5, 2013, at 7:53 PM, Guozhang Wang  wrote:
> >>
> >> > Hi Rajasekar,
> >> >
> >> > Is auto.create.topics.enable set to true in your target cluster?
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango
> >> >> >wrote:
> >> >
> >> >> We having issues that mirormaker not longer replicate newly created
> >> topics.
> >> >> It continues to replicate data for existing topics and but new topics
> >> >> doesn't get created on target cluster. ConsumerOffsetTracker shows
> >>that
> >> new
> >> >> topics are successfully consumed and Lag is 0. But those topics
> >>doesn't
> >> get
> >> >> created in target cluster. I also don't see mbeans for this new topic
> >> under
> >> >> kafka.producer.ProducerTopicMetrics.metric. In logs I see
> >> >> warning for NotLeaderForPatition. but don't see major error. What
> >>else
> >> can
> >> >> we look to troubleshoot this further.
> >> >>
> >> >> --
> >> >> Thanks,
> >> >> Raja.
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> >--
> >-- Guozhang
>
>


-- 
Thanks,
Raja.


Re: Mirror maker doesn't replicate new topics

2013-09-05 Thread Rajasekar Elango
Yes guozhang

Sent from my iPhone

On Sep 5, 2013, at 7:53 PM, Guozhang Wang  wrote:

> Hi Rajasekar,
>
> Is auto.create.topics.enable set to true in your target cluster?
>
> Guozhang
>
>
> On Thu, Sep 5, 2013 at 4:39 PM, Rajasekar Elango 
> wrote:
>
>> We having issues that mirormaker not longer replicate newly created topics.
>> It continues to replicate data for existing topics and but new topics
>> doesn't get created on target cluster. ConsumerOffsetTracker shows that new
>> topics are successfully consumed and Lag is 0. But those topics doesn't get
>> created in target cluster. I also don't see mbeans for this new topic under
>> kafka.producer.ProducerTopicMetrics.metric. In logs I see
>> warning for NotLeaderForPatition. but don't see major error. What else can
>> we look to troubleshoot this further.
>>
>> --
>> Thanks,
>> Raja.
>
>
>
> --
> -- Guozhang


Mirror maker doesn't replicate new topics

2013-09-05 Thread Rajasekar Elango
We having issues that mirormaker not longer replicate newly created topics.
It continues to replicate data for existing topics and but new topics
doesn't get created on target cluster. ConsumerOffsetTracker shows that new
topics are successfully consumed and Lag is 0. But those topics doesn't get
created in target cluster. I also don't see mbeans for this new topic under
kafka.producer.ProducerTopicMetrics.metric. In logs I see
warning for NotLeaderForPatition. but don't see major error. What else can
we look to troubleshoot this further.

-- 
Thanks,
Raja.


Re: Kafka Monitoring

2013-09-05 Thread Rajasekar Elango
Thanks a lot Jun. This is very helpful.

Thanks,
Raja.


On Thu, Sep 5, 2013 at 1:12 AM, Jun Rao  wrote:

> Updated the doc at http://kafka.apache.org/documentation.html#monitoring
>
> Hopefully that answers your questions.
>
> Thanks,
>
> Jun
>
>
> On Tue, Sep 3, 2013 at 11:16 PM, Vadim Keylis 
> wrote:
>
> > Good evening. I have read through section of monitoring. I tried to map
> > each section to corresponding JMX attribute. I will appreciate if you
> > answer a few questions bellow.
> >
> > Thanks so much in advance,
> > Vadim
> >
> > What this JMX
> > "kafka.controller":type="KafkaController",name="ActiveControllerCount"
> for?
> >
> > The rate of data in and out of the cluster and the number of messages
> > written
> >Which jmx attributes should I monitor? Since I should alert on this
> What
> > are acceptable changes? What are not?
> > The log flush rate and the time taken to flush the log
> > "kafka.log":type="LogFlushStats",name="LogFlushRateAndTimeMs"
> > Which attribute I should be watching and what acceptable deviation change
> > before I should alert
> > The number of partitions that have replicas that are down or have
> > fallen behind and are underreplicated.
> >Is this the JMX
> > "kafka.cluster":type="Partition",name="buypets-0-UnderReplicated" that
> will
> > show replicas that are down?
> >
> > Unclean leader elections. This shouldn't happen.
> >
> >
> >
>  
> "kafka.controller":type="ControllerStats",name="UncleanLeaderElectionsPerSec".
> > I assume that should always be 0 and if its not 0 we have problem.
> > Number of partitions each node is the leader for.
> >Which JMX attribute(s) monitors this?
> > Leader elections: we track each time this happens and how long it
> took:
> >
> >
> >
> "kafka.controller":type="ControllerStats",name="LeaderElectionRateAndTimeMs"
> > Any changes to the ISR
> > Which JMX attribute I should monitor for this? Should I alert on
> this?
> > What are reasonable changes? Which are not?
> > The number of produce requests waiting on replication to report back
> >Which JMX attribute I should monitor for this? Should I alert on this?
> > What are reasonable changes? Which are not?
> > The number of fetch requests waiting on data to arrive
> >Which JMX attribute I should monitor for this? Should I alert on this?
> > What are reasonable changes? Which are not?
> >
>



-- 
Thanks,
Raja.


Re: Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-04 Thread Rajasekar Elango
Sure. Thanks Neha.  Created Issue:
https://issues.apache.org/jira/browse/KAFKA-1041




On Wed, Sep 4, 2013 at 10:32 AM, Neha Narkhede wrote:

> Ideally if the producer runs into any error, it should close the previous
> socket and open a new one. Seems like that is not happening here. I will
> take a closer look at this today. Do you mind filing a bug?
>
> Thanks,
> Neha
> On Sep 4, 2013 7:23 AM, "Rajasekar Elango"  wrote:
>
> > I can easily reproduce this with console producer,  If I run console
> > producer with right hostname and if broker is not running, the console
> > producer will exit after three tries. But If I run console producer with
> > unresolvable broker, it throws below exception and continues to wait for
> > user input, every time I enter new message, it opens socket and file
> handle
> > count keeps increasing..
> >
> > Here is Exception in producer
> >
> > ERROR fetching topic metadata for topics [Set(test-1378245487417)] from
> > broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
> > (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-1378245487417)] from broker
> > [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
> > at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:526)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > Caused by: java.nio.channels.UnresolvedAddressException
> > at sun.nio.ch.Net.checkAddress(Net.java:30)
> > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487)
> > at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:59)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:151)
> > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
> > at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37)
> > ... 12 more
> >
> >
> >
> > On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede  > >wrote:
> >
> > > Interesting. What errors/exceptions do you see in the producer logs?
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango <
> rela...@salesforce.com
> > > >wrote:
> > >
> > > > We found a issue that if broker host is un resolvable, the number of
> > file
> > > > handle keep increasing for every message we produce and eventually it
> > > uses
> > > > up all available files handles in operating system. If broker itself
> is
> > > not
> > > > running and broker host name is resolvable, open file handles count
> > stays
> > > > flat.
> > > >
> > > > lsof output shows number of these open file handles continue to grow
> > for
> > > > every message we produce.
> > > >
> > > >  java  19631relango   81u sock0,6
>  0t0
> > > >  196966526 can't identify protocol
> > > >
> > > > Is this a bug is producer API..? What is best way to self protect our
> > > self
> > > > ?
> > > >
> > > > --
> > > > Thanks,
> > > > Raja.
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Kafka-0.8.0-beta1-src Has ObjectName starting with Double Quotes

2013-09-04 Thread Rajasekar Elango
We have the same problem, it doesn't work with restful JMX console
jiminix.
Is it possible to change kafka to expose mbeans without quotes?

Thanks,
Raja.


On Wed, Sep 4, 2013 at 10:30 AM, Neha Narkhede wrote:

> I had the same problem and could not use jmxterm to inspect jmx beans due
> to this issue. But we tried escaping the quotes and it works with our
> internal monitoring system as well as JmxTool that ships with Kafka.
>
> Thanks,
> Neha
> On Sep 4, 2013 7:16 AM, "Monika Garg"  wrote:
>
> > Hi,
> >
> > Kafka-0.8.0-beta1-src is having doublequotes in the start of Objects name
> > obtained from jConsole.Due to this I am not able to use jmxTrans to
> monitor
> > my kafka-0.8.0 cluster.
> >
> > Please help in solving the issue.
> >
> >
> >
> > Regards
> > Monika Garg
> > Associate Software Engineer
> > Impetus Infotech (India) Pvt. Ltd.
> > D-40, Sector-59, Noida - 201307, UP
> > (O) +91-120-4363300 x 2858
> > (M) +91-8588075977
> > www.impetus.com
> >
> >
> > 
> >
> >
> >
> >
> >
> >
> > NOTE: This message may contain information that is confidential,
> > proprietary, privileged or otherwise protected by law. The message is
> > intended solely for the named addressee. If received in error, please
> > destroy and notify the sender. Any use of this email is prohibited when
> > received in error. Impetus does not represent, warrant and/or guarantee,
> > that the integrity of this communication has been maintained nor that the
> > communication is free of errors, virus, interception or interference.
> >
>



-- 
Thanks,
Raja.


Re: Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-04 Thread Rajasekar Elango
I can easily reproduce this with console producer,  If I run console
producer with right hostname and if broker is not running, the console
producer will exit after three tries. But If I run console producer with
unresolvable broker, it throws below exception and continues to wait for
user input, every time I enter new message, it opens socket and file handle
count keeps increasing..

Here is Exception in producer

ERROR fetching topic metadata for topics [Set(test-1378245487417)] from
broker [ArrayBuffer(id:0,host:localhost1,port:6667)] failed
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(test-1378245487417)] from broker
[ArrayBuffer(id:0,host:localhost1,port:6667)] failed
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:30)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:487)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:59)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:151)
at
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:166)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:73)
at kafka.producer.SyncProducer.send(SyncProducer.scala:117)
at
kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:37)
... 12 more



On Tue, Sep 3, 2013 at 9:29 PM, Neha Narkhede wrote:

> Interesting. What errors/exceptions do you see in the producer logs?
>
> Thanks,
> Neha
>
>
> On Tue, Sep 3, 2013 at 3:28 PM, Rajasekar Elango  >wrote:
>
> > We found a issue that if broker host is un resolvable, the number of file
> > handle keep increasing for every message we produce and eventually it
> uses
> > up all available files handles in operating system. If broker itself is
> not
> > running and broker host name is resolvable, open file handles count stays
> > flat.
> >
> > lsof output shows number of these open file handles continue to grow for
> > every message we produce.
> >
> >  java  19631relango   81u sock0,6  0t0
> >  196966526 can't identify protocol
> >
> > Is this a bug is producer API..? What is best way to self protect our
> self
> > ?
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Number of file handles increases indefinitely in producer if broker host is unresolvable

2013-09-03 Thread Rajasekar Elango
We found a issue that if broker host is un resolvable, the number of file
handle keep increasing for every message we produce and eventually it uses
up all available files handles in operating system. If broker itself is not
running and broker host name is resolvable, open file handles count stays
flat.

lsof output shows number of these open file handles continue to grow for
every message we produce.

 java  19631relango   81u sock0,6  0t0
 196966526 can't identify protocol

Is this a bug is producer API..? What is best way to self protect our self ?

-- 
Thanks,
Raja.


Re: Mirrormaker stopped consuming

2013-09-03 Thread Rajasekar Elango
Thanks Neha,

I did not take a thread dump before restarting, will get it when it happens
again. We are using 16 Gigs of jvm heap. Do you have a recommendation on
jvm GC options.?

Thanks,
Raja.


On Tue, Sep 3, 2013 at 12:26 PM, Neha Narkhede wrote:

> 2013-09-01 05:59:27,792 [main-EventThread] INFO
>  (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed (Disconnected)
> 2013-09-01 05:59:27,692 [main-SendThread(
> mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
>  (org.apache.zookeeper.
> ClientCnxn)  - Client session timed out, have not
> heard from server in 4002ms for sessionid 0x140c603da5b0032, closing socket
> connection and attempting reconnect
>
> This indicates that your mirror maker and/or your zookeeper cluster is
> GCing for long periods of time. I have observed that if "client session
> timed out" happens too many times, the client tends to lose zookeeper
> watches. This is a potential bug in zookeeper. If this happens, your mirror
> maker instance might not rebalance correctly and will start losing data.
>
> You mentioned consumption/production stopped on your mirror maker, could
> you please take a thread dump and point us to it? Meanwhile, you might want
> to fix the GC pauses.
>
> Thanks,
> Neha
>
>
> On Tue, Sep 3, 2013 at 8:59 AM, Rajasekar Elango  >wrote:
>
> > We found that mirrormaker stopped consuming and producing over the week
> end
> > (09/01). Just seeing "Client session timed out" messages in mirrormaker
> > log. I restarted to it today 09/03 to resume processing. Here is the logs
> > line in reverse order.
> >
> >
> > 2013-09-03 14:20:40,918
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.utils.VerifiableProperties)  - Verifying properties
> > 2013-09-03 14:20:40,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> > begin rebalancing consumer
> > mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try
> #1
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> > Committing all offsets after clearing the fetcher queues
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> > Cleared the data chunks in all the consumer message iterators
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> > Cleared all relevant queues for this fetcher
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> > [ConsumerFetcherManager-1378218012760] All connections stopped
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> > [ConsumerFetcherManager-1378218012760] Stopping all fetchers
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ConsumerFetcherManager)  -
> > [ConsumerFetcherManager-1378218012760] Stopping leader finder thread
> > 2013-09-03 14:20:38,877
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> > Rebalancing attempt failed. Clearing the cache before the next
> rebalancing
> > operation is triggered
> > 2013-09-03 14:20:38,876
> >
> >
> [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
> > INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
> > [mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
> end
> > rebalancing consumer
>

Mirrormaker stopped consuming

2013-09-03 Thread Rajasekar Elango
We found that mirrormaker stopped consuming and producing over the week end
(09/01). Just seeing "Client session timed out" messages in mirrormaker
log. I restarted to it today 09/03 to resume processing. Here is the logs
line in reverse order.


2013-09-03 14:20:40,918
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.utils.VerifiableProperties)  - Verifying properties
2013-09-03 14:20:40,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
begin rebalancing consumer
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try #1
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
Committing all offsets after clearing the fetcher queues
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
Cleared the data chunks in all the consumer message iterators
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
Cleared all relevant queues for this fetcher
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ConsumerFetcherManager)  -
[ConsumerFetcherManager-1378218012760] All connections stopped
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ConsumerFetcherManager)  -
[ConsumerFetcherManager-1378218012760] Stopping all fetchers
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ConsumerFetcherManager)  -
[ConsumerFetcherManager-1378218012760] Stopping leader finder thread
2013-09-03 14:20:38,877
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506],
Rebalancing attempt failed. Clearing the cache before the next rebalancing
operation is triggered
2013-09-03 14:20:38,876
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506_watcher_executor]
INFO  (kafka.consumer.ZookeeperConsumerConnector)  -
[mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506], end
rebalancing consumer
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1378218012575-6779d506 try #0
2013-09-01 05:59:29,069 [main-SendThread(
mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
 (org.apache.zookeeper.ClientCnxn)  - Socket connection established to
mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181, initiating session
2013-09-01 05:59:29,069 [main-SendThread(
mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
 (org.apache.zookeeper.ClientCnxn)  - Opening socket connection to server
mandm-zookeeper-asg.data.sfdc.net/10.228.48.38:2181
2013-09-01 05:59:27,792 [main-EventThread] INFO
 (org.I0Itec.zkclient.ZkClient)  - zookeeper state changed (Disconnected)
2013-09-01 05:59:27,692 [main-SendThread(
mandm-zookeeper-asg.data.sfdc.net:2181)] INFO
 (org.apache.zookeeper.ClientCnxn)  - Client session timed out, have not
heard from server in 4002ms for sessionid 0x140c603da5b0032, closing socket
connection and attempting reconnect


As you can see, no log lines appeared after 2013-09-01 05:59:29. I checked
lag using consumerOffsetChecker and observed that log size and lag is
growing, but offset of mirrormaker remains same. We have two mirrormaker
process running and both of them had same issue during same time frame..
Any hint on what could be problem..? How do we go about trouble shooting
this..?

Thanks in advance..

-- 
Thanks,
Raja.


Re: kafka produce API batching

2013-08-30 Thread Rajasekar Elango
Great. Thanks for reply Jun.

Thanks,
Raja.


On Fri, Aug 30, 2013 at 11:50 PM, Jun Rao  wrote:

> If you are using the async mode, there is no difference. If you use sync
> mode, the former api gives you a way to batch request in sync node, which
> can lead to better throughput.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 30, 2013 at 10:28 AM, Rajasekar Elango
> wrote:
>
> > Kafka producer API supports sending a single message and as well sending
> > list of messages with two different methods. I believe irrespective of
> > either of send method used, producer internally batches
> batch.num.messages
> > and sends them in bulk. Is there any advantage in performance of using
> > send(List>
> > messages)to send multiple messages over sending messages one by one
> > using send(KeyedMessage
> > message) ?
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


kafka produce API batching

2013-08-30 Thread Rajasekar Elango
Kafka producer API supports sending a single message and as well sending
list of messages with two different methods. I believe irrespective of
either of send method used, producer internally batches batch.num.messages
and sends them in bulk. Is there any advantage in performance of using
send(List>
messages)to send multiple messages over sending messages one by one
using send(KeyedMessage
message) ?

-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-30 Thread Rajasekar Elango
Yeah. The actual bottleneck is actually number of topics that match the
topic filter. Num of streams is going be shared between all topics it's
consuming from. I thought about following ideas to work around this. (I am
basically referring to mirrormaker consumer in examples).

Option 1). Instead of running one mirrormaker process with topic filter
".+", We can start multiple mirrormaker process with topic filter matching
each topic (Eg: mirrormaker1 => whitelist topic1.* , mirrormaker2
=> whitelist topic2.* etc)

But this adds some operations overhead to start and manage multiple
processes on the host.

Option 2) Modify mirrormaker code to support list of whitelist filters and
it should create message streams for  each filter
(call createMessageStreamsByFilter for each filter).

What would be your recommendation..? If adding feature to mirrormaker is
worth kafka, we can do option 2.

Thanks,
Raja.




On Fri, Aug 30, 2013 at 10:34 AM, Jun Rao  wrote:

> Right, but if you set #partitions in each topic to 16, you can use a total
> of 16 streams.
>
> Thanks,
>
> Jun
>
>
> On Thu, Aug 29, 2013 at 9:08 PM, Rajasekar Elango  >wrote:
>
> > With option 1) I can't really use 8 streams in each consumer, If I do
> only
> > one consumer seem to be doing all work. So I had to actually use total 8
> > streams with 4 for each consumer.
> >
> >
> >
> > On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao  wrote:
> >
> > > The drawback of 2), as you said is no auto failover. I was suggesting
> > that
> > > you use 16 partitions. Then you can use option 1) with 8 streams in
> each
> > > consumer.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > If you read my previous posts, based on current re balancing logic,
> if
> > we
> > > > consumer from topic filter, consumer actively use all streams. Can
> you
> > > > provide your recommendation of option 1 vs option 2 in my previous
> > post?
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao  wrote:
> > > >
> > > > > You can always use more partitions to get more parallelism in the
> > > > > consumers.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
> > > > > wrote:
> > > > >
> > > > > > So what is best way to load balance multiple consumers consuming
> > from
> > > > > topic
> > > > > > filter.
> > > > > >
> > > > > > Let's say we have 4 topics with 8 partitions and 2 consumers.
> > > > > >
> > > > > > Option 1) To load balance consumers, we can set num.streams=4 so
> > that
> > > > > both
> > > > > > consumers split 8 partitions. but can only use half of consumer
> > > > streams.
> > > > > >
> > > > > > Option 2) Configure mutually exclusive topic filter regex such
> > that 2
> > > > > > topics will match consumer1 and 2 topics will match consumer2.
> Now
> > we
> > > > can
> > > > > > set num.streams=8 and fully utilize consumer streams. I believe
> > this
> > > > will
> > > > > > improve performance, but if consumer dies, we will not get any
> data
> > > > from
> > > > > > the topic used by that consumer.
> > > > > >
> > > > > > What would be your recommendation?
> > > > > >
> > > > > > Thanks,
> > > > > > Raja.
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede <
> > > > neha.narkh...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > >> 2) When I started mirrormaker with num.streams=16, looks
> like
> > 16
> > > > > > > consumer
> > > > > > > threads were created, but only 8 are showing up as active as
> > owner
> > > in
> > > > > > > consumer offset tracker and all

Re: Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
With option 1) I can't really use 8 streams in each consumer, If I do only
one consumer seem to be doing all work. So I had to actually use total 8
streams with 4 for each consumer.



On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao  wrote:

> The drawback of 2), as you said is no auto failover. I was suggesting that
> you use 16 partitions. Then you can use option 1) with 8 streams in each
> consumer.
>
> Thanks,
>
> Jun
>
>
> On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango  >wrote:
>
> > Hi Jun,
> >
> > If you read my previous posts, based on current re balancing logic, if we
> > consumer from topic filter, consumer actively use all streams. Can you
> > provide your recommendation of option 1 vs option 2 in my previous post?
> >
> > Thanks,
> > Raja.
> >
> >
> > On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao  wrote:
> >
> > > You can always use more partitions to get more parallelism in the
> > > consumers.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
> > > wrote:
> > >
> > > > So what is best way to load balance multiple consumers consuming from
> > > topic
> > > > filter.
> > > >
> > > > Let's say we have 4 topics with 8 partitions and 2 consumers.
> > > >
> > > > Option 1) To load balance consumers, we can set num.streams=4 so that
> > > both
> > > > consumers split 8 partitions. but can only use half of consumer
> > streams.
> > > >
> > > > Option 2) Configure mutually exclusive topic filter regex such that 2
> > > > topics will match consumer1 and 2 topics will match consumer2. Now we
> > can
> > > > set num.streams=8 and fully utilize consumer streams. I believe this
> > will
> > > > improve performance, but if consumer dies, we will not get any data
> > from
> > > > the topic used by that consumer.
> > > >
> > > > What would be your recommendation?
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > > >wrote:
> > > >
> > > > > >> 2) When I started mirrormaker with num.streams=16, looks like 16
> > > > > consumer
> > > > > threads were created, but only 8 are showing up as active as owner
> in
> > > > > consumer offset tracker and all topics/partitions are distributed
> > > > between 8
> > > > > consumer threads.
> > > > >
> > > > > This is because currently the consumer rebalancing process of
> > assigning
> > > > > partitions to consumer streams is at a per topic level. Unless you
> > have
> > > > at
> > > > > least one topic with 16 partitions, the remaining 8 threads will
> not
> > do
> > > > any
> > > > > work. This is not ideal and we want to look into a better
> rebalancing
> > > > > algorithm. Though it is a big change and we prefer doing it as part
> > of
> > > > the
> > > > > consumer client rewrite.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > >
> > > > > On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango <
> > > > rela...@salesforce.com
> > > > > >wrote:
> > > > >
> > > > > > So my understanding is num of active streams that a consumer can
> > > > utilize
> > > > > is
> > > > > > number of partitions in topic. This is fine if we consumer from
> > > > specific
> > > > > > topic. But if we consumer from TopicFilter, I thought consumer
> > should
> > > > > able
> > > > > > to utilize (number of topics that match filter * number of
> > partitions
> > > > in
> > > > > > topic) . But looks like number of streams that consumer can use
> is
> > > > > limited
> > > > > > by just number if partitions in topic although it's consuming
> from
> > > > > multiple
> > > > > > topic.
> > > > > >
> > > > > > Here what I observed with 1 mirrormaker consuming from whitelist
> > > '.+'.
> > > > > >
> > > > > > The white list matches 5 topics and each topic has 8 partitions.
> I
> > > used
> > > > > > consumer offset checker to look at owner of each/topic partition.
> > > > > >
> > > > > > 1) When I started mirrormaker with num.streams=8, all
> > > topics/partitions
> > > > > are
> > > > > > distributed between 8 consumer threads.
> > > > > >
> > > > > > 2) When I started mirrormaker with num.streams=16, looks like 16
> > > > consumer
> > > > > > threads were created, but only 8 are showing up as active as
> owner
> > in
> > > > > > consumer offset tracker and all topics/partitions are distributed
> > > > > between 8
> > > > > > consumer threads.
> > > > > >
> > > > > > So this could be bottleneck for consumers as although we
> > partitioned
> > > > > topic,
> > > > > > if we are consuming from topic filter it can't utilize much of
> > > > > parallelism
> > > > > > with num of streams. Am i missing something, is there a way to
> make
> > > > > > cosumers/mirrormakers to utilize more number of active streams?
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Raja.
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Raja.
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
Hi Jun,

If you read my previous posts, based on current re balancing logic, if we
consumer from topic filter, consumer actively use all streams. Can you
provide your recommendation of option 1 vs option 2 in my previous post?

Thanks,
Raja.


On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao  wrote:

> You can always use more partitions to get more parallelism in the
> consumers.
>
> Thanks,
>
> Jun
>
>
> On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango
> wrote:
>
> > So what is best way to load balance multiple consumers consuming from
> topic
> > filter.
> >
> > Let's say we have 4 topics with 8 partitions and 2 consumers.
> >
> > Option 1) To load balance consumers, we can set num.streams=4 so that
> both
> > consumers split 8 partitions. but can only use half of consumer streams.
> >
> > Option 2) Configure mutually exclusive topic filter regex such that 2
> > topics will match consumer1 and 2 topics will match consumer2. Now we can
> > set num.streams=8 and fully utilize consumer streams. I believe this will
> > improve performance, but if consumer dies, we will not get any data from
> > the topic used by that consumer.
> >
> > What would be your recommendation?
> >
> > Thanks,
> > Raja.
> >
> >
> > On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede  > >wrote:
> >
> > > >> 2) When I started mirrormaker with num.streams=16, looks like 16
> > > consumer
> > > threads were created, but only 8 are showing up as active as owner in
> > > consumer offset tracker and all topics/partitions are distributed
> > between 8
> > > consumer threads.
> > >
> > > This is because currently the consumer rebalancing process of assigning
> > > partitions to consumer streams is at a per topic level. Unless you have
> > at
> > > least one topic with 16 partitions, the remaining 8 threads will not do
> > any
> > > work. This is not ideal and we want to look into a better rebalancing
> > > algorithm. Though it is a big change and we prefer doing it as part of
> > the
> > > consumer client rewrite.
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > So my understanding is num of active streams that a consumer can
> > utilize
> > > is
> > > > number of partitions in topic. This is fine if we consumer from
> > specific
> > > > topic. But if we consumer from TopicFilter, I thought consumer should
> > > able
> > > > to utilize (number of topics that match filter * number of partitions
> > in
> > > > topic) . But looks like number of streams that consumer can use is
> > > limited
> > > > by just number if partitions in topic although it's consuming from
> > > multiple
> > > > topic.
> > > >
> > > > Here what I observed with 1 mirrormaker consuming from whitelist
> '.+'.
> > > >
> > > > The white list matches 5 topics and each topic has 8 partitions. I
> used
> > > > consumer offset checker to look at owner of each/topic partition.
> > > >
> > > > 1) When I started mirrormaker with num.streams=8, all
> topics/partitions
> > > are
> > > > distributed between 8 consumer threads.
> > > >
> > > > 2) When I started mirrormaker with num.streams=16, looks like 16
> > consumer
> > > > threads were created, but only 8 are showing up as active as owner in
> > > > consumer offset tracker and all topics/partitions are distributed
> > > between 8
> > > > consumer threads.
> > > >
> > > > So this could be bottleneck for consumers as although we partitioned
> > > topic,
> > > > if we are consuming from topic filter it can't utilize much of
> > > parallelism
> > > > with num of streams. Am i missing something, is there a way to make
> > > > cosumers/mirrormakers to utilize more number of active streams?
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Raja.
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Securing kafka

2013-08-29 Thread Rajasekar Elango
No certificates are not per topic. It is for entire broker.

Thanks,
Raja.


On Thu, Aug 29, 2013 at 11:33 PM, Joe Stein  wrote:

> are the certificate stores by topic? very interesting!!! looking forward to
> trying it out and review it
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> /
>
>
> On Thu, Aug 29, 2013 at 11:22 PM, Rajasekar Elango
> wrote:
>
> > We have made changes to kafka code to support certificate based mutual
> SSL
> > authentication. So the clients and broker will exchange trusted
> > certificates for successful communication. This provides both
> > authentication and ssl encryption. Planning to contribute that code back
> to
> > kafka soon.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Thu, Aug 29, 2013 at 11:16 PM, Joe Stein  wrote:
> >
> > > One use case I have been discussing recently with a few clients is
> > > verifying the digital signature of a message as part of the acceptance
> > > criteria of it being committed to the log and/or when it is consumed.
> > >
> > > I would be very interested in discussing different scenarios such as
> > Kafka
> > > as a service, privacy at rest as well as authorization and
> authentication
> > > (if required).
> > >
> > > Hit me up
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > /
> > >
> > >
> > > On Thu, Aug 29, 2013 at 8:13 PM, Jay Kreps 
> wrote:
> > >
> > > > +1
> > > >
> > > > We don't have any application-level security at this time so the
> answer
> > > is
> > > > whatever you can do at the network/system level.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Thu, Aug 29, 2013 at 10:09 AM, Benjamin Black  wrote:
> > > >
> > > > > IP filters on the hosts.
> > > > > On Aug 29, 2013 10:03 AM, "Calvin Lei"  wrote:
> > > > >
> > > > > > Is there a way to stop a malicious user to connect directly to a
> > > kafka
> > > > > > broker and send any messages? Could we have the brokers to
> accept a
> > > > > message
> > > > > > to a list of know IPs?
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Securing kafka

2013-08-29 Thread Rajasekar Elango
We have made changes to kafka code to support certificate based mutual SSL
authentication. So the clients and broker will exchange trusted
certificates for successful communication. This provides both
authentication and ssl encryption. Planning to contribute that code back to
kafka soon.

Thanks,
Raja.


On Thu, Aug 29, 2013 at 11:16 PM, Joe Stein  wrote:

> One use case I have been discussing recently with a few clients is
> verifying the digital signature of a message as part of the acceptance
> criteria of it being committed to the log and/or when it is consumed.
>
> I would be very interested in discussing different scenarios such as Kafka
> as a service, privacy at rest as well as authorization and authentication
> (if required).
>
> Hit me up
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
>
> On Thu, Aug 29, 2013 at 8:13 PM, Jay Kreps  wrote:
>
> > +1
> >
> > We don't have any application-level security at this time so the answer
> is
> > whatever you can do at the network/system level.
> >
> > -Jay
> >
> >
> > On Thu, Aug 29, 2013 at 10:09 AM, Benjamin Black  wrote:
> >
> > > IP filters on the hosts.
> > > On Aug 29, 2013 10:03 AM, "Calvin Lei"  wrote:
> > >
> > > > Is there a way to stop a malicious user to connect directly to a
> kafka
> > > > broker and send any messages? Could we have the brokers to accept a
> > > message
> > > > to a list of know IPs?
> > > >
> > >
> >
>



-- 
Thanks,
Raja.


Re: Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
So what is best way to load balance multiple consumers consuming from topic
filter.

Let's say we have 4 topics with 8 partitions and 2 consumers.

Option 1) To load balance consumers, we can set num.streams=4 so that both
consumers split 8 partitions. but can only use half of consumer streams.

Option 2) Configure mutually exclusive topic filter regex such that 2
topics will match consumer1 and 2 topics will match consumer2. Now we can
set num.streams=8 and fully utilize consumer streams. I believe this will
improve performance, but if consumer dies, we will not get any data from
the topic used by that consumer.

What would be your recommendation?

Thanks,
Raja.


On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede wrote:

> >> 2) When I started mirrormaker with num.streams=16, looks like 16
> consumer
> threads were created, but only 8 are showing up as active as owner in
> consumer offset tracker and all topics/partitions are distributed between 8
> consumer threads.
>
> This is because currently the consumer rebalancing process of assigning
> partitions to consumer streams is at a per topic level. Unless you have at
> least one topic with 16 partitions, the remaining 8 threads will not do any
> work. This is not ideal and we want to look into a better rebalancing
> algorithm. Though it is a big change and we prefer doing it as part of the
> consumer client rewrite.
>
> Thanks,
> Neha
>
>
> On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango  >wrote:
>
> > So my understanding is num of active streams that a consumer can utilize
> is
> > number of partitions in topic. This is fine if we consumer from specific
> > topic. But if we consumer from TopicFilter, I thought consumer should
> able
> > to utilize (number of topics that match filter * number of partitions in
> > topic) . But looks like number of streams that consumer can use is
> limited
> > by just number if partitions in topic although it's consuming from
> multiple
> > topic.
> >
> > Here what I observed with 1 mirrormaker consuming from whitelist '.+'.
> >
> > The white list matches 5 topics and each topic has 8 partitions. I used
> > consumer offset checker to look at owner of each/topic partition.
> >
> > 1) When I started mirrormaker with num.streams=8, all topics/partitions
> are
> > distributed between 8 consumer threads.
> >
> > 2) When I started mirrormaker with num.streams=16, looks like 16 consumer
> > threads were created, but only 8 are showing up as active as owner in
> > consumer offset tracker and all topics/partitions are distributed
> between 8
> > consumer threads.
> >
> > So this could be bottleneck for consumers as although we partitioned
> topic,
> > if we are consuming from topic filter it can't utilize much of
> parallelism
> > with num of streams. Am i missing something, is there a way to make
> > cosumers/mirrormakers to utilize more number of active streams?
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-29 Thread Rajasekar Elango
Created JIRA  <https://issues.apache.org/jira/browse/KAFKA-1035> and
attached patch to it. Please review.


On Wed, Aug 28, 2013 at 1:11 PM, Guozhang Wang  wrote:

> I think this patch can be made in trunk. You can mark it as 0.8.1
>
> Guozhang
>
> On Wednesday, August 28, 2013, Rajasekar Elango 
> wrote:
> > Guozhang ,
> >
> > *The documentation says I need to work off of trunk. Can you confirm If I
> > should be working in trunk or different branch.*
> > *
> > *
> > *Thanks,*
> > *Raja.*
> >
> >
> > On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang 
> wrote:
> >
> >> Cool! You can follow the process of creating a JIRA here:
> >>
> >> http://kafka.apache.org/contributing.html
> >>
> >> And submit patch here:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
> >>
> >> It will be great if you can also add an entry for this issue in FAQ
> since I
> >> think this is a common question:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> >>
> >> Guozhang
> >>
> >>
> >> On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
> rela...@salesforce.com
> >> >wrote:
> >>
> >> > Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
> >> > console producer code, I can also submit patch adding both
> >> > message.send.max.retries
> >> > and retry.backoff.ms to console producer. Can you let me know process
> >> for
> >> > submitting patch?
> >> >
> >> > Thanks,
> >> > Raja.
> >> >
> >> >
> >> > On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang 
> >> wrote:
> >> >
> >> > > Hello Rajasekar,
> >> > >
> >> > > The remove fetcher log entry is normal under addition of partitions,
> >> > since
> >> > > they indicate that some leader changes have happened so brokers are
> >> > closing
> >> > > the fetchers to the old leaders.
> >> > >
> >> > > I just realized that the console Producer does not have the
> >> > > message.send.max.retries options yet. Could you file a JIRA for this
> >> and
> >> > I
> >> > > will followup to add this option? As for now you can hard modify the
> >> > > default value from 3 to a larger number.
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
> >> > > wrote:
> >> > >
> >> > > > Thanks Neha & Guozhang,
> >> > > >
> >> > > > When I ran StateChangeLogMerger, I am seeing this message repeated
> 16
> >> > > times
> >> > > > for each partition:
> >> > > >
> >> > > > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1]
> >> > > Removing
> >> > > > fetcher for partition [test-60,13]
> >> (kafka.server.ReplicaFetcherManager)
> >> > > > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
> log
> >> > for
> >> > > > partition [test-60,13] in
> >> > > >
> >> /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
> >> > > > (kafka.log.LogManager)
> >> > > >
> >> > > > I am also seeing .log and .index files created for this topic in
> data
> >> > > dir.
> >> > > > Also list topic command shows leaders, replicas and isrs for all
> >> > > > partitions. Do you still think increasing num of retries would
> help
> >> or
> >> > is
> >> > > > it some other issue..? Also console Producer doesn't seem to  have
> >> > option
> >> > > > to set num of retries. Is there a way to configure num of retries
> for
> >> > > > console producer ?
> >> > > >
> >> > > > Thanks,
> >> > > > Raja.
> >> > > >
> >> > > >
> >> > > > On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
> >> > neha.narkh...@gmail.com
> >> > > > >wrote:
> >> > > >
> >> > > > > As Guozhang said, your producer might give up sooner than the
> >> leader
> >> > > > > election completes for the new topic. To confirm if your
> producer
> >> > gave
> >> > > up
> >> > > > > too soon, you can run the state--
> > Thanks,
> > Raja.
> >
>
> --
> -- Guozhang
>



-- 
Thanks,
Raja.


Num of streams for consumers using TopicFilter.

2013-08-29 Thread Rajasekar Elango
So my understanding is num of active streams that a consumer can utilize is
number of partitions in topic. This is fine if we consumer from specific
topic. But if we consumer from TopicFilter, I thought consumer should able
to utilize (number of topics that match filter * number of partitions in
topic) . But looks like number of streams that consumer can use is limited
by just number if partitions in topic although it's consuming from multiple
topic.

Here what I observed with 1 mirrormaker consuming from whitelist '.+'.

The white list matches 5 topics and each topic has 8 partitions. I used
consumer offset checker to look at owner of each/topic partition.

1) When I started mirrormaker with num.streams=8, all topics/partitions are
distributed between 8 consumer threads.

2) When I started mirrormaker with num.streams=16, looks like 16 consumer
threads were created, but only 8 are showing up as active as owner in
consumer offset tracker and all topics/partitions are distributed between 8
consumer threads.

So this could be bottleneck for consumers as although we partitioned topic,
if we are consuming from topic filter it can't utilize much of parallelism
with num of streams. Am i missing something, is there a way to make
cosumers/mirrormakers to utilize more number of active streams?


-- 
Thanks,
Raja.


Re: Changing the number of partitions after a topic is created

2013-08-28 Thread Rajasekar Elango
Thanks Neha, that helps.


On Wed, Aug 28, 2013 at 12:59 PM, Neha Narkhede wrote:

> The tool you are looking for is described here -
>
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-5.AddPartitionCommandTool
> .
>
> Please note that this tool can only increase the number of partitions for a
> topic, there is no tool to reduce the number of partitions yet.
>
> Thanks,
> Neha
>
>
> On Wed, Aug 28, 2013 at 9:54 AM, Rajasekar Elango  >wrote:
>
> > Hi Neha,
> >
> > If we set auto.create.topics.enable = true and update num.partitions. New
> > topic get right number of partitions. But existing topics continue to use
> > whatever num.partitions set before. I am asking about the tool that allow
> > existing topic to use updated number of partitions.
> >
> > Thanks,
> > Raja.
> >
> >
> > On Wed, Aug 28, 2013 at 12:38 PM, Neha Narkhede  > >wrote:
> >
> > > Hi Rajasekar,
> > >
> > > I'm not sure I fully understood your question. Are you asking about a
> > tool
> > > that can increase the number of partitions for a topic?
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Wed, Aug 28, 2013 at 8:47 AM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > It's been some time since you last post, is this patch available now.
> > > Also
> > > > we are doing following manual steps to update existing topic to use
> new
> > > > partitions.
> > > >
> > > > 1) stop all zookeepers
> > > > 2) stop all kafka brokers
> > > > 3) clean data dir or zookeepers and kafka
> > > > 4) start zookeepers and kafka.
> > > >
> > > > This involves both downtime and loosing data. Is there a better way
> to
> > > > update existing topics to use new partitions until patch is
> > available..?
> > > >
> > > > --
> > > > Thanks,
> > > > Raja.
> > > >
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-28 Thread Rajasekar Elango
Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8
branch.

Thanks,
Raja.


On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede wrote:

> Rajasekar,
>
> We are trying to minimize the number of patches in 0.8 to critical bug
> fixes or broken tooling. If the patch involves significant code changes, we
> would encourage taking it on trunk. If you want to just fix the console
> producer to take the retry argument, I would think it is small enough to
> consider taking it on 0.8 branch since it affects the usability of the
> console producer.
>
> Thanks,
> Neha
>
>
> On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango  >wrote:
>
> > Guozhang ,
> >
> > *The documentation says I need to work off of trunk. Can you confirm If I
> > should be working in trunk or different branch.*
> > *
> > *
> > *Thanks,*
> > *Raja.*
> >
> >
> > On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang 
> wrote:
> >
> > > Cool! You can follow the process of creating a JIRA here:
> > >
> > > http://kafka.apache.org/contributing.html
> > >
> > > And submit patch here:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
> > >
> > > It will be great if you can also add an entry for this issue in FAQ
> > since I
> > > think this is a common question:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
> > > > console producer code, I can also submit patch adding both
> > > > message.send.max.retries
> > > > and retry.backoff.ms to console producer. Can you let me know
> process
> > > for
> > > > submitting patch?
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Rajasekar,
> > > > >
> > > > > The remove fetcher log entry is normal under addition of
> partitions,
> > > > since
> > > > > they indicate that some leader changes have happened so brokers are
> > > > closing
> > > > > the fetchers to the old leaders.
> > > > >
> > > > > I just realized that the console Producer does not have the
> > > > > message.send.max.retries options yet. Could you file a JIRA for
> this
> > > and
> > > > I
> > > > > will followup to add this option? As for now you can hard modify
> the
> > > > > default value from 3 to a larger number.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
> > > > > wrote:
> > > > >
> > > > > > Thanks Neha & Guozhang,
> > > > > >
> > > > > > When I ran StateChangeLogMerger, I am seeing this message
> repeated
> > 16
> > > > > times
> > > > > > for each partition:
> > > > > >
> > > > > > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker
> 1]
> > > > > Removing
> > > > > > fetcher for partition [test-60,13]
> > > (kafka.server.ReplicaFetcherManager)
> > > > > > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
> > log
> > > > for
> > > > > > partition [test-60,13] in
> > > > > >
> > > /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
> > > > > > (kafka.log.LogManager)
> > > > > >
> > > > > > I am also seeing .log and .index files created for this topic in
> > data
> > > > > dir.
> > > > > > Also list topic command shows leaders, replicas and isrs for all
> > > > > > partitions. Do you still think increasing num of retries would
> help
> > > or
> > > > is
> > > > > > it some other issue..? Also console Producer doesn't seem to
>  have
> > > > option
> > > > > > to set num of retries. Is there a way to configure num of retries
> > for
> > > > &g

Re: Changing the number of partitions after a topic is created

2013-08-28 Thread Rajasekar Elango
Hi Neha,

If we set auto.create.topics.enable = true and update num.partitions. New
topic get right number of partitions. But existing topics continue to use
whatever num.partitions set before. I am asking about the tool that allow
existing topic to use updated number of partitions.

Thanks,
Raja.


On Wed, Aug 28, 2013 at 12:38 PM, Neha Narkhede wrote:

> Hi Rajasekar,
>
> I'm not sure I fully understood your question. Are you asking about a tool
> that can increase the number of partitions for a topic?
>
> Thanks,
> Neha
>
>
> On Wed, Aug 28, 2013 at 8:47 AM, Rajasekar Elango  >wrote:
>
> > Hi Jun,
> >
> > It's been some time since you last post, is this patch available now.
> Also
> > we are doing following manual steps to update existing topic to use new
> > partitions.
> >
> > 1) stop all zookeepers
> > 2) stop all kafka brokers
> > 3) clean data dir or zookeepers and kafka
> > 4) start zookeepers and kafka.
> >
> > This involves both downtime and loosing data. Is there a better way to
> > update existing topics to use new partitions until patch is available..?
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.


Re: Changing the number of partitions after a topic is created

2013-08-28 Thread Rajasekar Elango
Hi Jun,

It's been some time since you last post, is this patch available now. Also
we are doing following manual steps to update existing topic to use new
partitions.

1) stop all zookeepers
2) stop all kafka brokers
3) clean data dir or zookeepers and kafka
4) start zookeepers and kafka.

This involves both downtime and loosing data. Is there a better way to
update existing topics to use new partitions until patch is available..?

-- 
Thanks,
Raja.


Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-28 Thread Rajasekar Elango
Guozhang ,

*The documentation says I need to work off of trunk. Can you confirm If I
should be working in trunk or different branch.*
*
*
*Thanks,*
*Raja.*


On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang  wrote:

> Cool! You can follow the process of creating a JIRA here:
>
> http://kafka.apache.org/contributing.html
>
> And submit patch here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
>
> It will be great if you can also add an entry for this issue in FAQ since I
> think this is a common question:
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ
>
> Guozhang
>
>
> On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango  >wrote:
>
> > Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
> > console producer code, I can also submit patch adding both
> > message.send.max.retries
> > and retry.backoff.ms to console producer. Can you let me know process
> for
> > submitting patch?
> >
> > Thanks,
> > Raja.
> >
> >
> > On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Rajasekar,
> > >
> > > The remove fetcher log entry is normal under addition of partitions,
> > since
> > > they indicate that some leader changes have happened so brokers are
> > closing
> > > the fetchers to the old leaders.
> > >
> > > I just realized that the console Producer does not have the
> > > message.send.max.retries options yet. Could you file a JIRA for this
> and
> > I
> > > will followup to add this option? As for now you can hard modify the
> > > default value from 3 to a larger number.
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
> > > wrote:
> > >
> > > > Thanks Neha & Guozhang,
> > > >
> > > > When I ran StateChangeLogMerger, I am seeing this message repeated 16
> > > times
> > > > for each partition:
> > > >
> > > > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1]
> > > Removing
> > > > fetcher for partition [test-60,13]
> (kafka.server.ReplicaFetcherManager)
> > > > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log
> > for
> > > > partition [test-60,13] in
> > > >
> /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
> > > > (kafka.log.LogManager)
> > > >
> > > > I am also seeing .log and .index files created for this topic in data
> > > dir.
> > > > Also list topic command shows leaders, replicas and isrs for all
> > > > partitions. Do you still think increasing num of retries would help
> or
> > is
> > > > it some other issue..? Also console Producer doesn't seem to  have
> > option
> > > > to set num of retries. Is there a way to configure num of retries for
> > > > console producer ?
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > > >wrote:
> > > >
> > > > > As Guozhang said, your producer might give up sooner than the
> leader
> > > > > election completes for the new topic. To confirm if your producer
> > gave
> > > up
> > > > > too soon, you can run the state change log merge tool for this
> topic
> > > and
> > > > > see when the leader election finished for all partitions
> > > > >
> > > > > ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs
> > > >  > > > > to all state change logs> --topic 
> > > > >
> > > > > Note that this tool requires you to give the state change logs for
> > all
> > > > > brokers in the cluster.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > >
> > > > > On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hello Rajasekar,
> > > > > >
> > > > > > In 0.8 producers keep a cache of the partition ->
> leader_broker_id
> > > map
> > > > > > which is used to determine to which brokers should the messages
> be
> > > > sent.
> > > > &g

Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
console producer code, I can also submit patch adding both
message.send.max.retries
and retry.backoff.ms to console producer. Can you let me know process for
submitting patch?

Thanks,
Raja.


On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang  wrote:

> Hello Rajasekar,
>
> The remove fetcher log entry is normal under addition of partitions, since
> they indicate that some leader changes have happened so brokers are closing
> the fetchers to the old leaders.
>
> I just realized that the console Producer does not have the
> message.send.max.retries options yet. Could you file a JIRA for this and I
> will followup to add this option? As for now you can hard modify the
> default value from 3 to a larger number.
>
> Guozhang
>
>
> On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
> wrote:
>
> > Thanks Neha & Guozhang,
> >
> > When I ran StateChangeLogMerger, I am seeing this message repeated 16
> times
> > for each partition:
> >
> > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1]
> Removing
> > fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
> > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
> > partition [test-60,13] in
> > /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
> > (kafka.log.LogManager)
> >
> > I am also seeing .log and .index files created for this topic in data
> dir.
> > Also list topic command shows leaders, replicas and isrs for all
> > partitions. Do you still think increasing num of retries would help or is
> > it some other issue..? Also console Producer doesn't seem to  have option
> > to set num of retries. Is there a way to configure num of retries for
> > console producer ?
> >
> > Thanks,
> > Raja.
> >
> >
> > On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede  > >wrote:
> >
> > > As Guozhang said, your producer might give up sooner than the leader
> > > election completes for the new topic. To confirm if your producer gave
> up
> > > too soon, you can run the state change log merge tool for this topic
> and
> > > see when the leader election finished for all partitions
> > >
> > > ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs
> >  > > to all state change logs> --topic 
> > >
> > > Note that this tool requires you to give the state change logs for all
> > > brokers in the cluster.
> > >
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Rajasekar,
> > > >
> > > > In 0.8 producers keep a cache of the partition -> leader_broker_id
> map
> > > > which is used to determine to which brokers should the messages be
> > sent.
> > > > After new partitions are added, the cache on the producer has not
> > > populated
> > > > yet hence it will throw this exception. The producer will then try to
> > > > refresh its cache by asking the brokers "who are the leaders of these
> > new
> > > > partitions that I do not know of before". The brokers at the
> beginning
> > > also
> > > > do not know this information, and will only get this information from
> > > > controller which will only propagation the leader information after
> the
> > > > leader elections have all been finished.
> > > >
> > > > If you set num.retries to 3 then it is possible that producer gives
> up
> > > too
> > > > soon before the leader info ever propagated to producers, hence to
> > > > producers also. Could you try to increase producer.num.retries and
> see
> > if
> > > > the producer can eventually succeed in re-trying?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
> > > rela...@salesforce.com
> > > > >wrote:
> > > >
> > > > > Hello everyone,
> > > > >
> > > > > We recently increased number of partitions from 4 to 16 and after
> > that
> > > > > console producer mostly fails with LeaderNotAvailableException and
> > > exits
> > > > > after 3 tries:
> > > > >
> > > > > Here is last few lines of console producer log:
> > > > >
&

Re: Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Thanks Neha & Guozhang,

When I ran StateChangeLogMerger, I am seeing this message repeated 16 times
for each partition:

[2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker 1] Removing
fetcher for partition [test-60,13] (kafka.server.ReplicaFetcherManager)
[2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created log for
partition [test-60,13] in
/home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
(kafka.log.LogManager)

I am also seeing .log and .index files created for this topic in data dir.
Also list topic command shows leaders, replicas and isrs for all
partitions. Do you still think increasing num of retries would help or is
it some other issue..? Also console Producer doesn't seem to  have option
to set num of retries. Is there a way to configure num of retries for
console producer ?

Thanks,
Raja.


On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede wrote:

> As Guozhang said, your producer might give up sooner than the leader
> election completes for the new topic. To confirm if your producer gave up
> too soon, you can run the state change log merge tool for this topic and
> see when the leader election finished for all partitions
>
> ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger --logs  to all state change logs> --topic 
>
> Note that this tool requires you to give the state change logs for all
> brokers in the cluster.
>
>
> Thanks,
> Neha
>
>
> On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang  wrote:
>
> > Hello Rajasekar,
> >
> > In 0.8 producers keep a cache of the partition -> leader_broker_id map
> > which is used to determine to which brokers should the messages be sent.
> > After new partitions are added, the cache on the producer has not
> populated
> > yet hence it will throw this exception. The producer will then try to
> > refresh its cache by asking the brokers "who are the leaders of these new
> > partitions that I do not know of before". The brokers at the beginning
> also
> > do not know this information, and will only get this information from
> > controller which will only propagation the leader information after the
> > leader elections have all been finished.
> >
> > If you set num.retries to 3 then it is possible that producer gives up
> too
> > soon before the leader info ever propagated to producers, hence to
> > producers also. Could you try to increase producer.num.retries and see if
> > the producer can eventually succeed in re-trying?
> >
> > Guozhang
> >
> >
> > On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
> rela...@salesforce.com
> > >wrote:
> >
> > > Hello everyone,
> > >
> > > We recently increased number of partitions from 4 to 16 and after that
> > > console producer mostly fails with LeaderNotAvailableException and
> exits
> > > after 3 tries:
> > >
> > > Here is last few lines of console producer log:
> > >
> > > No partition metadata for topic test-41 due to
> > > kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
> > > kafka.common.LeaderNotAvailableException
> > >  (kafka.producer.BrokerPartitionInfo)
> > > [2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
> > > partition due to: Failed to fetch topic metadata for topic: test-41
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying
> send.
> > > Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
> > > [2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is
> > enabled
> > > (kafka.producer.SyncProducerConfig)
> > > [2013-08-27 08:29:30,372] INFO Fetching metadata from broker
> > > id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
> > > topic(s) Set(test-41) (kafka.client.ClientUtils$)
> > > [2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
> > > 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
> > > [2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
> > > 127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
> > > [2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
> > > producing (kafka.producer.SyncProducer)
> > > [2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
> > > (kafka.producer.SyncProducer)
> > > [2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is
> > enabled
> > > (kafka.producer.SyncProducerConfig)
> > > [2013-08-27 08:29:30,381] ER

Getting LeaderNotAvailableException in console producer after increasing partitions from 4 to 16.

2013-08-27 Thread Rajasekar Elango
Hello everyone,

We recently increased number of partitions from 4 to 16 and after that
console producer mostly fails with LeaderNotAvailableException and exits
after 3 tries:

Here is last few lines of console producer log:

No partition metadata for topic test-41 due to
kafka.common.LeaderNotAvailableException}] for topic [test-41]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2013-08-27 08:29:30,271] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test-41
(kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,271] INFO Back off for 100 ms before retrying send.
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2013-08-27 08:29:30,372] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,372] INFO Fetching metadata from broker
id:0,host:localhost,port:6667,secure:true with correlation id 8 for 1
topic(s) Set(test-41) (kafka.client.ClientUtils$)
[2013-08-27 08:29:30,373] INFO begin ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO finished ssl handshake for localhost/
127.0.0.1:6667//127.0.0.1:36640 (kafka.security.SSLSocketChannel)
[2013-08-27 08:29:30,375] INFO Connected to localhost:6667:true for
producing (kafka.producer.SyncProducer)
[2013-08-27 08:29:30,380] INFO Disconnecting from localhost:6667:true
(kafka.producer.SyncProducer)
[2013-08-27 08:29:30,381] INFO Secure sockets for data transfer is enabled
(kafka.producer.SyncProducerConfig)
[2013-08-27 08:29:30,381] ERROR Failed to send requests for topics test-41
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:74)
at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
[2013-08-27 08:29:30,383] INFO Shutting down producer
(kafka.producer.Producer)
[2013-08-27 08:29:30,384] INFO Closing all sync producers
(kafka.producer.ProducerPool)


Also, this happens only for new topics (we have auto.create.topic set to
true), If retry sending message to existing topic, it works fine. Is there
any tweaking I need to do to broker or to producer to scale based on number
of partitions?

-- 
Thanks in advance for help,
Raja.


Re: Differences in size of data replicated by mirror maker

2013-08-23 Thread Rajasekar Elango
Thanks Guazhang, Jun,

Yes we doing gzip compression and that should be reason for difference in
disk usage. I had a typo that the size is actually 91G in source cluster.So
25G/91G ratio makes sense for compression.

Thanks,
Raja.


On Thu, Aug 22, 2013 at 7:00 PM, Guozhang Wang  wrote:

> When you state the numbers, are they the same across instances in the
> cluster, meaning that Topic-0 would have 910*5 GB in source cluster and
> 25*5 GB in target cluster?
>
> Another possibility is that MirrorMaker uses compression on the producer
> side, but I would be surprised if the compression rate could be 25/910.
>
> Guozhang
>
>
> On Thu, Aug 22, 2013 at 3:48 PM, Rajasekar Elango  >wrote:
>
> > Yes, both source and target clusters have 5 brokers in cluster.
> >
> > Sent from my iPhone
> >
> > On Aug 22, 2013, at 6:11 PM, Guozhang Wang  wrote:
> >
> > > Hello Rajasekar,
> > >
> > > Are the size of the source cluster and target cluster the same?
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango <
> > rela...@salesforce.com>wrote:
> > >
> > >> Hi,
> > >>
> > >> We are using mirrormaker to replicate data between two kafka clusters.
> > I am
> > >> seeing huge difference in size of log in data dir between the broker
> in
> > >> source cluster vs broker in destination cluster:
> > >>
> > >> For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but
> > only
> > >> its only 25G in destination broker. I see segmented log files (~500 M)
> > is
> > >> created for about every 2 or 3 mins in source brokers, but I see
> > segmented
> > >> log files is created for about every 25 mins in destination broker.
> > >>
> > >> I verified mirrormaker is doing fine using consumer offset checker,
> not
> > >> much lag, offsets are incrementing. I also verified that
> > topics/partitions
> > >> are not under replicated in both source and target cluster. What is
> the
> > >> reason for this difference in disk usage?
> > >>
> > >>
> > >> --
> > >> Thanks,
> > >> Raja.
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



-- 
Thanks,
Raja.


Fwd: Tuning mirror maker performance

2013-08-23 Thread Rajasekar Elango
Thanks Jun,

What trouble shooting steps can we do to identify if bottleneck is with
consuming or producing..? Does changing anything in log4j configuration or
a jmx mbeans provide insight into it..? Does Metadata refresh interval
affect picking up new partitions for only existing topic or it affect
picking up any new topics..?

Thanks,
Raja.

-- Forwarded message --
From: Jun Rao 
Date: Fri, Aug 23, 2013 at 12:08 AM
Subject: Re: Tuning mirror maker performance
To: "users@kafka.apache.org" 


You have to determine whether the bottleneck is in the consumer or the
producer.

To improve the performance of the latter, you can increase the # of total
consumer streams. # streams is capped by total # partitions. So, you may
need to increase the # of partitions.

To improve the performance of the latter, you can (a) increase the batch
size in async mode and/or (b) run more instances of producers.

Metadata refresh interval is configurable. It's mainly for the producer to
pick up newly available partitions.

Thanks,

Jun


On Thu, Aug 22, 2013 at 1:44 PM, Rajasekar Elango wrote:

> I am trying to tune mirrormaker configurations based on this doc
> <
>
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes
> >
> and
> would like know your recommendations.
>
> Our configuration: We are doing inter datacenter replication with 5
brokers
> in source and destination DC and 2 mirrormakers doing replication. We have
> about 4 topics with 4 partitions each.
> I have been consumerOffsetChecker to analysis lag based on tuning.
>
>
>1. num.streams : - We have set num.streams=2 so that 4 partitions will
>be shared between 2 mirrormaker. Increasing num.streams more than this
> did
>not improve any performance, is this correct?
>2. num.producers:- We initially set num.producers = 4 (assuming one
>producer thread per topic), then we bumped num.producers = 16, but did
> not
>see any improvement in performance..? Is this correct..? How do we
>determine optimum value for num.producers ?
>3. *socket.buffersize : *We initially had default values for these,
then
>I changed socket.send.buffer.bytes on source broker,
>socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
>consumer properties, socket.receive.buffer.bytes,
>socket.request.max.bytes on destination broker all to
>1024*1024*1024(1073741824) . This did improve the performance, but I
> could
>not get Lag to < 100.
>
>Here is how our lag looks like after above changes:
>
> Group   Topic  Pid Offset
> logSize  Lag Owner
> mirrormakerProd FunnelProto0   554704539
> 554717088   12549
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
> mirrormakerProd FunnelProto1   547370573
> 547383136   12563
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
> mirrormakerProd FunnelProto2   553124930
> 553125742   812
> mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
> mirrormakerProd FunnelProto3   552990834
> 552991650   816
> mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
> mirrormakerProd agent  0   35438   35440
> 2
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
> mirrormakerProd agent  1   35447   35448
> 1
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
> mirrormakerProd agent  2   35375   35375
> 0
> mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
> mirrormakerProd agent  3   35336   35336
> 0
> mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
> mirrormakerProd internal_metrics   0   1930852823
>  1930917418  64595
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
> mirrormakerProd internal_metrics   1   1937237324
>  1937301841  64517
> mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
> mirrormakerProd internal_metrics   2   1945894901
>  1945904067  9166
>  mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
> mirrormakerProd internal_metrics   3   1946906932
>  1946915928  8996
>  mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
> mirrormakerProd jmx0   485270038
> 485280882   10844
> mirr

Re: Differences in size of data replicated by mirror maker

2013-08-22 Thread Rajasekar Elango
Yes, both source and target clusters have 5 brokers in cluster.

Sent from my iPhone

On Aug 22, 2013, at 6:11 PM, Guozhang Wang  wrote:

> Hello Rajasekar,
>
> Are the size of the source cluster and target cluster the same?
>
> Guozhang
>
>
> On Thu, Aug 22, 2013 at 2:14 PM, Rajasekar Elango 
> wrote:
>
>> Hi,
>>
>> We are using mirrormaker to replicate data between two kafka clusters. I am
>> seeing huge difference in size of log in data dir between the broker in
>> source cluster vs broker in destination cluster:
>>
>> For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only
>> its only 25G in destination broker. I see segmented log files (~500 M) is
>> created for about every 2 or 3 mins in source brokers, but I see segmented
>> log files is created for about every 25 mins in destination broker.
>>
>> I verified mirrormaker is doing fine using consumer offset checker, not
>> much lag, offsets are incrementing. I also verified that topics/partitions
>> are not under replicated in both source and target cluster. What is the
>> reason for this difference in disk usage?
>>
>>
>> --
>> Thanks,
>> Raja.
>
>
>
> --
> -- Guozhang


Differences in size of data replicated by mirror maker

2013-08-22 Thread Rajasekar Elango
Hi,

We are using mirrormaker to replicate data between two kafka clusters. I am
seeing huge difference in size of log in data dir between the broker in
source cluster vs broker in destination cluster:

For eg: Size of ~/data/Topic-0/ is about 910 G in source broker, but only
its only 25G in destination broker. I see segmented log files (~500 M) is
created for about every 2 or 3 mins in source brokers, but I see segmented
log files is created for about every 25 mins in destination broker.

I verified mirrormaker is doing fine using consumer offset checker, not
much lag, offsets are incrementing. I also verified that topics/partitions
are not under replicated in both source and target cluster. What is the
reason for this difference in disk usage?


-- 
Thanks,
Raja.


Re: Tuning mirror maker performance

2013-08-22 Thread Rajasekar Elango
Hi,

I am trying to tune mirrormaker configurations based on this doc
<https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes>
and
would like know your recommendations.

Our configuration: We are doing inter datacenter replication with 5 brokers
in source and destination DC and 2 mirrormakers doing replication. We have
about 4 topics with 4 partitions each.
I have been consumerOffsetChecker to analysis lag based on tuning.


   1. num.streams : - We have set num.streams=2 so that 4 partitions will
   be shared between 2 mirrormaker. Increasing num.streams more than this did
   not improve any performance, is this correct?
   2. num.producers:- We initially set num.producers = 4 (assuming one
   producer thread per topic), then we bumped num.producers = 16, but did not
   see any improvement in performance..? Is this correct..? How do we
   determine optimum value for num.producers ?
   3. *socket.buffersize : *We initially had default values for these, then
   I changed socket.send.buffer.bytes on source broker,
   socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
   consumer properties, socket.receive.buffer.bytes,
   socket.request.max.bytes on destination broker all to
   1024*1024*1024(1073741824) . This did improve the performance, but I could
   not get Lag to < 100.

   Here is how our lag looks like after above changes:

Group   Topic  Pid Offset
logSize  Lag Owner
mirrormakerProd FunnelProto0   554704539
554717088   12549
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd FunnelProto1   547370573
547383136   12563
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd FunnelProto2   553124930
553125742   812
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd FunnelProto3   552990834
552991650   816
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd agent  0   35438   35440
2
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd agent  1   35447   35448
1
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd agent  2   35375   35375
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd agent  3   35336   35336
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd internal_metrics   0   1930852823
 1930917418  64595
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd internal_metrics   1   1937237324
 1937301841  64517
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd internal_metrics   2   1945894901
 1945904067  9166
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd internal_metrics   3   1946906932
 1946915928  8996
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd jmx0   485270038
485280882   10844
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd jmx1   486363914
486374759   10845
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd jmx2   491783842
491784826   984
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd jmx3   485675629
485676643   1014
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1

In mirrormaker logs, I see topic metadata is fetched after every 10mins and
connection reestablished with producers for producing. Is this normal? If
it's continuously producing, why does it need to reconnect to destination
brokers for producing.?
What else can we tune to bring lag < 100 ..?  This is just small set of
data we are currently testing, the real production traffic will be very
large. How can compute optimum configuration as data traffic increases.?

Thanks for help,

Thanks,
Raja.


On Thu, Aug 22, 2013 at 4:44 PM, Rajasekar Elango wrote:

> I am trying to tune mirrormaker configurations based on this doc 
> <https://cwiki.apache.org/confluence/display/KAFKA/Kafka+mirroring+(MirrorMaker)#Kafkamirroring%28MirrorMaker%29-Consumerandsourceclustersocketbuffersizes>
>  and
> would like know your recommendations.
>
> Our configuration: We are doing inter datacenter replication with 5
> brokers in source

Tuning mirror maker performance

2013-08-22 Thread Rajasekar Elango
I am trying to tune mirrormaker configurations based on this doc

and
would like know your recommendations.

Our configuration: We are doing inter datacenter replication with 5 brokers
in source and destination DC and 2 mirrormakers doing replication. We have
about 4 topics with 4 partitions each.
I have been consumerOffsetChecker to analysis lag based on tuning.


   1. num.streams : - We have set num.streams=2 so that 4 partitions will
   be shared between 2 mirrormaker. Increasing num.streams more than this did
   not improve any performance, is this correct?
   2. num.producers:- We initially set num.producers = 4 (assuming one
   producer thread per topic), then we bumped num.producers = 16, but did not
   see any improvement in performance..? Is this correct..? How do we
   determine optimum value for num.producers ?
   3. *socket.buffersize : *We initially had default values for these, then
   I changed socket.send.buffer.bytes on source broker,
   socket.receive.buffer.bytes, fetch.message.max.bytes on mirrormaker
   consumer properties, socket.receive.buffer.bytes,
   socket.request.max.bytes on destination broker all to
   1024*1024*1024(1073741824) . This did improve the performance, but I could
   not get Lag to < 100.

   Here is how our lag looks like after above changes:

Group   Topic  Pid Offset
logSize  Lag Owner
mirrormakerProd FunnelProto0   554704539
554717088   12549
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd FunnelProto1   547370573
547383136   12563
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd FunnelProto2   553124930
553125742   812
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd FunnelProto3   552990834
552991650   816
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd agent  0   35438   35440
2
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd agent  1   35447   35448
1
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd agent  2   35375   35375
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd agent  3   35336   35336
0
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd internal_metrics   0   1930852823
 1930917418  64595
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd internal_metrics   1   1937237324
 1937301841  64517
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd internal_metrics   2   1945894901
 1945904067  9166
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd internal_metrics   3   1946906932
 1946915928  8996
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1
mirrormakerProd jmx0   485270038
485280882   10844
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-0
mirrormakerProd jmx1   486363914
486374759   10845
mirrormakerProd_ops-mmrs1-1-asg.ops.sfdc.net-1377192412490-38a53dc9-1
mirrormakerProd jmx2   491783842
491784826   984
mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-0
mirrormakerProd jmx3   485675629
485676643   1014
 mirrormakerProd_ops-mmrs1-2-asg.ops.sfdc.net-1377193322178-7262ed87-1

In mirrormaker logs, I see topic metadata is fetched after every 10mins and
connection reestablished with producers for producing. Is this normal? If
it's continuously producing, why does it need to reconnect to destination
brokers for producing.?
What else can we tune to bring lag < 100 ..?  This is just small set of
data we are currently testing, the real production traffic will be very
large. How can compute optimum configuration as data traffic increases.?

Thanks for help,

Thanks,
Raja.