Replica manager exception in broker

2015-05-22 Thread tao xiao
Hi team,

One of the brokers keeps getting below exception.

[2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error when
processing fetch request for partition [test1,0] offset 206845418 from
consumer with correlation id 93748260. Possible cause: Request for offset
206845418 but we only have log segments in the range 207804287 to
207804287. (kafka.server.ReplicaManager)
This is the follower broker of topic test1 and ISR of that topic has only 1
broker left right now. Just wanted to know what cause this issue and how I
can prevent it?

-- 
Regards,
Tao


Re: Mirrormaker stops consuming

2015-05-22 Thread tao xiao
It is possible that the message produced rate is slower than the consumed
message rate which results in insufficient space left for the internal data
channel mirror maker uses to buffer the data from consumer to producer. You
can check histogram MirrorMaker-DataChannel-Size to see if any space left.

On Fri, May 22, 2015 at 11:35 PM, Rajasekar Elango rela...@salesforce.com
wrote:

 We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
 that randomly stops consuming. We had to restart the mirrormaker process to
 resolve the problem. This problem has occurred several times in past two
 weeks.

 Here is what I found in analysis:

 When this problem happens:

 Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
 messages in mirrormaker log are ProducerSendThread producing to
 destination. No errors or exceptions.

 Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
 mirrormaker consumer offset stops incrementing.

 Mirrormaker consumer MinFetch rate jmx metric drops to zero.
 ConsumerTopicMetric.BytesPerSec drops to zero.

 So its mirrormaker consumer should have stopped accepting new data.

 Can some one provide input on how to trouble shoot this problem further and
 identify root cause?

 Got Thread dump before restarting, it looks ok to me, no blocked thread.
 Here is thread dump output

 2015-05-21 18:59:09
 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode):

 Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting
 on condition [0x]
java.lang.Thread.State: RUNNABLE

Locked ownable synchronizers:
 - None


 ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2
 prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition
 [0x7f72833f2000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x00042cd15cc8 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at

 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
 at
 scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
 at
 scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at
 scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at

 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Locked ownable synchronizers:
 - 0x00042ea62eb0 (a
 java.util.concurrent.locks.ReentrantLock$NonfairSync)


 ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3
 prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition
 [0x7f7281f99000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x00042ccece80 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at

 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
 at
 

Re: Replica manager exception in broker

2015-05-22 Thread tao xiao
Hi Joel,

The error offset 206845418 didn't change. The only thing that changed is
the correlation id and it was incrementing.

The broker is the follower and I saw similar error messages for other
topics the broker was a follower for.  As indicated by the log this is a
request coming from a consumer not follower. One thing I don't quite
understand is that consumer requests for the topic (test1) should go to the
leader not follower so why there were consumer requests connecting to the
broker? The other issue I noticed is that the replica fetcher threads from
the follower didn't fetch any data at all from leader the log file size in
follower didn't grow for several hours

On Sat, May 23, 2015 at 12:40 AM, Joel Koshy jjkosh...@gmail.com wrote:

 When you say keeps getting below exception I'm assuming that the
 error offset (206845418) keeps changing - right? We saw a similar
 issue in the past and it turned out to be due to a NIC issue - i.e.,
 it negotiated at a low speed. So the replica fetcher couldn't keep up
 with the leader. i.e., while it caught up within the first segment the
 leader's log would roll (i.e., the segment would get deleted) and we
 would see the out of range error. Is this broker a follower for other
 partitions? Do those partitions show up in these error message?

 On Fri, May 22, 2015 at 03:11:09PM +0800, tao xiao wrote:
  Hi team,
 
  One of the brokers keeps getting below exception.
 
  [2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error
 when
  processing fetch request for partition [test1,0] offset 206845418 from
  consumer with correlation id 93748260. Possible cause: Request for offset
  206845418 but we only have log segments in the range 207804287 to
  207804287. (kafka.server.ReplicaManager)
  This is the follower broker of topic test1 and ISR of that topic has
 only 1
  broker left right now. Just wanted to know what cause this issue and how
 I
  can prevent it?
 
  --
  Regards,
  Tao




-- 
Regards,
Tao


Re: unclean.leader.election.enable question

2015-05-20 Thread tao xiao
Thank you Mayuresh for the quick reply. If my producer has acks=all set
would the producer get callback indicating the missing 2000 messages
unsuccessful delivery assuming new Java producer is used

On Wednesday, May 20, 2015, gharatmayures...@gmail.com wrote:

 This is not unclean leader election since the follower is still in ISR.
 Yes we will loose those 2000 messages.

 Mayuresh

 Sent from my iPhone

  On May 20, 2015, at 8:31 AM, tao xiao xiaotao...@gmail.com
 javascript:; wrote:
 
  Hi team,
 
  I know that if a broker is behind the leader by no more than
  replica.lag.max.messages
  the broker is considered in sync with the leader. Considering a situation
  where I have unclean.leader.election.enable=true set in brokers and the
  follower is now 2000 messages behind (the default
 replica.lag.max.messages
  is 4000), will the follower be elected as the leader if the current
 leader
  is down? if yes do we lose that 2000 messages?
 
  --
  Regards,
  Tao



-- 
Regards,
Tao


unclean.leader.election.enable question

2015-05-20 Thread tao xiao
Hi team,

I know that if a broker is behind the leader by no more than
replica.lag.max.messages
the broker is considered in sync with the leader. Considering a situation
where I have unclean.leader.election.enable=true set in brokers and the
follower is now 2000 messages behind (the default replica.lag.max.messages
is 4000), will the follower be elected as the leader if the current leader
is down? if yes do we lose that 2000 messages?

-- 
Regards,
Tao


Re: Experiences testing new producer performance across multiple threads/producer counts

2015-05-18 Thread tao xiao
Garry,

Do you mind to share the source code that you did for the profiling?

On Sun, May 17, 2015 at 4:59 PM, Garry Turkington 
g.turking...@improvedigital.com wrote:

 Hi Guozhang/Jay/Becket,

 Thanks for the responses.

 Regarding my point on performance dropping when the number of partitions
 was increased, that surprised me too as on another cluster I had done just
 this to help with the issue of lots of ISR churn and it had been a straight
 win.

 I mentioned in my last mail that I had simplified the code to generate
 test messages with the effect that it greatly reduced the CPU load per
 thread. After doing this the performance on the higher partition-count
 topic was consistent with the lower partition count one and showed no
 degredation. So the sender threads were becoming CPU bound, I'm assuming
 possibly due to the additional locks involved with more partitions but that
 needs validation.

 I've been running my clients with acks=1, linger.ms floating between 0
 and 1 because I want to convince myself of it making a difference but so
 far I've not really seen it and similar to Jay's experiences settled on 64K
 for batch.size because I just didn't see any benefit of anything beyond
 that and even the jump from 32K wasn't proved beneficial. For this
 particular application I've already hit the needed performance (around
 700K/sec at peak) but my workload can be quite a sawtooth moving from peak
 to idle and back again. So peak becomes the new norm and understanding the
 head-room in the current setup and how to grow beyond that is important.

 I've had a few more test boxes put on the same 10GB network as the cluster
 in question so I'll re-visit this and do deeper profiling over the next
 week and will revert here with findings.

 Regards
 Garry

 -Original Message-
 From: Guozhang Wang [mailto:wangg...@gmail.com]
 Sent: 14 May 2015 18:57
 To: users@kafka.apache.org
 Subject: Re: Experiences testing new producer performance across multiple
 threads/producer counts

 Regarding the issue that adding more partitions kill the performance: I
 would suspect it maybe due to not-sufficient batching. Note that in the new
 producer batching is done per-partition, and if linger.ms setting low,
 partition data may not be batched enough before they got sent to the
 brokers. Also since the new producer will drain all partitions that belongs
 to the same broker, when one of them hits either linger time or batch size,
 when you only have one or a few brokers this will further exaggerate the
 not-sufficient-batching issue. So monitoring on average batch size would be
 a good idea.

 Guozhang

 On Wed, May 13, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Garry,
 
  Super interesting. We honestly never did a ton of performance tuning
  on the producer. I checked the profiles early on in development and we
  fixed a few issues that popped up in deployment, but I don't think
  anyone has done a really scientific look. If you (or anyone else) want
  to dive into things I suspect it could be improved.
 
  Becket is exactly right. There are two possible bottlenecks you can
  hit in the producer--the single background sender thread and the
  per-partition lock. You can check utilization on the background thread
  with jvisualvm (it's named something like
  kafka-producer-network-thread). The locking is fairly hard to improve.
 
  It's a little surprising that adding partitions caused a large
  decrease in performance. Generally this is only the case if you
  override the flush settings on the broker to force fsync more frequently.
 
  The ISR issues under heavy load are probably fixable, the issue is
  discussed a bit here:
 
  http://blog.confluent.io/2015/04/07/hands-free-kafka-replication-a-les
  son-in-operational-simplicity/
 
  The producer settings that may matter for performance are:
  acks
  batch.size (though beyond 32k I didn't see much improvement) linger.ms
  (setting = 1 may help a bit) send.buffer.bytes (maybe, but probably
  not)
 
  Cheers,
 
  -Jay
 
  On Wed, May 13, 2015 at 3:42 PM, Jiangjie Qin
  j...@linkedin.com.invalid
  wrote:
 
   Thanks for sharing this, Garry. I actually did similar tests before
   but unfortunately lost the test data because my laptop rebooted and
   I forgot to save the dataĆ 
  
   Anyway, several things to verify:
  
   1. Remember KafkaProducer holds lock per partition. So if you have
   only one partition in the target topic and many application threads.
   Lock contention could be an issue.
  
   2. It matters that how frequent the sender thread wake up and runs.
   You can take a look at the following sensors to further verify
   whether the sender thread really become a bottleneck or not:
   Select-rate
   Io-wait-time-ns-avg
   Io-time-ns-avg
  
   3. Batch size matters, so take a look at the sensor batch-size-avg
   and
  see
   if the average batch size makes sense or not.
  
   Looking forward to your further profiling. My thinking is that
 

Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-15 Thread tao xiao
i will try to reproduce this problem later this week.

Bouncing the broker fixed the issue but the issue surfaced again after a
period of time. A little more context about this is that the cluster was
deployed to VMs and I discovered that the issue appeared whenever CPU wait
time was extremely high like 90% CPU time spent on I/O wait. I am more
interesting in understanding under what circumstance this issue would
happen so that I can take appropriate actions

On Fri, May 15, 2015 at 8:04 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:


 If you can reproduce this problem steadily, once you see this issue, can
 you grep the controller log for topic partition in question and see if
 there is anything interesting?

 Thanks.

 Jiangjie (Becket) Qin

 On 5/14/15, 3:43 AM, tao xiao xiaotao...@gmail.com wrote:

 Yes, it does exist in ZK and the node that had the
 NotLeaderForPartitionException
 is the leader of the topic
 
 On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Does this topic exist in Zookeeper?
 
  On 5/12/15, 11:35 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi,
  
  Any updates on this issue? I keep seeing this issue happening over and
  over
  again
  
  On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote:
  
   Hi team,
  
   I have a 12 nodes cluster that has 800 topics and each of which has
  only 1
   partition. I observed that one of the node keeps generating
   NotLeaderForPartitionException that causes the node to be
 unresponsive
  to
   all requests. Below is the exception
  
   [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error
 for
   partition [topic1,0] to broker 12:class
   kafka.common.NotLeaderForPartitionException
   (kafka.server.ReplicaFetcherThread)
  
   All other nodes in the cluster generate lots of replication error
 too as
   shown below due to unresponsiveness of above node.
  
   [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
   request with correlation id 3630911 from client
  ReplicaFetcherThread-0-1 on
   partition [topic1,0] failed due to Leader not local for partition
   [cg22_user.item_attr_info.lcr,0] on broker 1
  (kafka.server.ReplicaManager)
  
   Any suggestion why the node runs into the unstable stage and any
   configuration I can set to prevent this?
  
   I use kafka 0.8.2.1
  
   And here is the server.properties
  
  
   broker.id=5
   port=9092
   num.network.threads=3
   num.io.threads=8
   socket.send.buffer.bytes=1048576
   socket.receive.buffer.bytes=1048576
   socket.request.max.bytes=104857600
   log.dirs=/mnt/kafka
   num.partitions=1
   num.recovery.threads.per.data.dir=1
   log.retention.hours=1
   log.segment.bytes=1073741824
   log.retention.check.interval.ms=30
   log.cleaner.enable=false
   zookeeper.connect=ip:2181
   zookeeper.connection.timeout.ms=6000
   unclean.leader.election.enable=false
   delete.topic.enable=true
   default.replication.factor=3
   num.replica.fetchers=3
   delete.topic.enable=true
   kafka.metrics.reporters=report.KafkaMetricsCollector
   straas.hubble.conf.file=/etc/kafka/report.conf
  
  
  
  
   --
   Regards,
   Tao
  
  
  
  
  --
  Regards,
  Tao
 
 
 
 
 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-14 Thread tao xiao
Yes, it does exist in ZK and the node that had the
NotLeaderForPartitionException
is the leader of the topic

On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Does this topic exist in Zookeeper?

 On 5/12/15, 11:35 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi,
 
 Any updates on this issue? I keep seeing this issue happening over and
 over
 again
 
 On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi team,
 
  I have a 12 nodes cluster that has 800 topics and each of which has
 only 1
  partition. I observed that one of the node keeps generating
  NotLeaderForPartitionException that causes the node to be unresponsive
 to
  all requests. Below is the exception
 
  [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
  partition [topic1,0] to broker 12:class
  kafka.common.NotLeaderForPartitionException
  (kafka.server.ReplicaFetcherThread)
 
  All other nodes in the cluster generate lots of replication error too as
  shown below due to unresponsiveness of above node.
 
  [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
  request with correlation id 3630911 from client
 ReplicaFetcherThread-0-1 on
  partition [topic1,0] failed due to Leader not local for partition
  [cg22_user.item_attr_info.lcr,0] on broker 1
 (kafka.server.ReplicaManager)
 
  Any suggestion why the node runs into the unstable stage and any
  configuration I can set to prevent this?
 
  I use kafka 0.8.2.1
 
  And here is the server.properties
 
 
  broker.id=5
  port=9092
  num.network.threads=3
  num.io.threads=8
  socket.send.buffer.bytes=1048576
  socket.receive.buffer.bytes=1048576
  socket.request.max.bytes=104857600
  log.dirs=/mnt/kafka
  num.partitions=1
  num.recovery.threads.per.data.dir=1
  log.retention.hours=1
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=30
  log.cleaner.enable=false
  zookeeper.connect=ip:2181
  zookeeper.connection.timeout.ms=6000
  unclean.leader.election.enable=false
  delete.topic.enable=true
  default.replication.factor=3
  num.replica.fetchers=3
  delete.topic.enable=true
  kafka.metrics.reporters=report.KafkaMetricsCollector
  straas.hubble.conf.file=/etc/kafka/report.conf
 
 
 
 
  --
  Regards,
  Tao
 
 
 
 
 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-13 Thread tao xiao
Hi,

Any updates on this issue? I keep seeing this issue happening over and over
again

On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I have a 12 nodes cluster that has 800 topics and each of which has only 1
 partition. I observed that one of the node keeps generating
 NotLeaderForPartitionException that causes the node to be unresponsive to
 all requests. Below is the exception

 [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
 partition [topic1,0] to broker 12:class
 kafka.common.NotLeaderForPartitionException
 (kafka.server.ReplicaFetcherThread)

 All other nodes in the cluster generate lots of replication error too as
 shown below due to unresponsiveness of above node.

 [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
 request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on
 partition [topic1,0] failed due to Leader not local for partition
 [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager)

 Any suggestion why the node runs into the unstable stage and any
 configuration I can set to prevent this?

 I use kafka 0.8.2.1

 And here is the server.properties


 broker.id=5
 port=9092
 num.network.threads=3
 num.io.threads=8
 socket.send.buffer.bytes=1048576
 socket.receive.buffer.bytes=1048576
 socket.request.max.bytes=104857600
 log.dirs=/mnt/kafka
 num.partitions=1
 num.recovery.threads.per.data.dir=1
 log.retention.hours=1
 log.segment.bytes=1073741824
 log.retention.check.interval.ms=30
 log.cleaner.enable=false
 zookeeper.connect=ip:2181
 zookeeper.connection.timeout.ms=6000
 unclean.leader.election.enable=false
 delete.topic.enable=true
 default.replication.factor=3
 num.replica.fetchers=3
 delete.topic.enable=true
 kafka.metrics.reporters=report.KafkaMetricsCollector
 straas.hubble.conf.file=/etc/kafka/report.conf




 --
 Regards,
 Tao




-- 
Regards,
Tao


Getting NotLeaderForPartitionException in kafka broker

2015-05-07 Thread tao xiao
Hi team,

I have a 12 nodes cluster that has 800 topics and each of which has only 1
partition. I observed that one of the node keeps generating
NotLeaderForPartitionException that causes the node to be unresponsive to
all requests. Below is the exception

[2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
partition [topic1,0] to broker 12:class
kafka.common.NotLeaderForPartitionException
(kafka.server.ReplicaFetcherThread)

All other nodes in the cluster generate lots of replication error too as
shown below due to unresponsiveness of above node.

[2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch request
with correlation id 3630911 from client ReplicaFetcherThread-0-1 on
partition [topic1,0] failed due to Leader not local for partition
[cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager)

Any suggestion why the node runs into the unstable stage and any
configuration I can set to prevent this?

I use kafka 0.8.2.1

And here is the server.properties


broker.id=5
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mnt/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=1
log.segment.bytes=1073741824
log.retention.check.interval.ms=30
log.cleaner.enable=false
zookeeper.connect=ip:2181
zookeeper.connection.timeout.ms=6000
unclean.leader.election.enable=false
delete.topic.enable=true
default.replication.factor=3
num.replica.fetchers=3
delete.topic.enable=true
kafka.metrics.reporters=report.KafkaMetricsCollector
straas.hubble.conf.file=/etc/kafka/report.conf




-- 
Regards,
Tao


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote:

 Commenting out Example shutdown did not seem to make a difference, I added
 the print statement below to highlight the fact.

 The other threads still shut down, and only one thread lives on, eventually
 that dies after a few minutes as well

 Could this be that the producer default partitioner is isn't balancing data
 across all partitions?

 Thanks,
 Chris

 Thread 0: 2015-04-29
 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

 Last Shutdown via example.shutDown called!

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
 ZKConsumerConnector shutting down

 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
 scheduler

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping leader finder thread

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping all fetchers

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-consumergroup], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-] All connections stopped

 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
 thread.

 Shutting down Thread: 2

 Shutting down Thread: 1

 Shutting down Thread: 3

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], ZKConsumerConnector shut down completed

 Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
 distance|-73.99021500035|40.6636611

 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], stopping watcher executor thread for consumer
 consumergroup

 Thread 0: 2015-04-29
 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:

  example.shutdown(); in ConsumerGroupExample closes all consumer
 connections
  to Kafka. remove this line the consumer threads will run forever
 
  On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
  wrote:
 
   Hi All,
  
   I am trying to get a multi threaded HL consumer working against a 2
  broker
   Kafka cluster with a 4 partition 2 replica  topic.
  
   The consumer code is set to run with 4 threads, one for each partition.
  
   The producer code uses the default partitioner and loops indefinitely
   feeding events into the topic.(I excluded the while loop in the paste
   below)
  
   What I see is the threads eventually all exit, even thought the
 producer
  is
   still sending events into the topic.
  
   My understanding is that the consumer thread per partition is the
 correct
   setup.
  
   Any ideas why this code doesn't continue to consume events at they are
   pushed to the topic?
  
   I suspect I am configuring something wrong here, but am not sure what.
  
   Thanks,
  
   Chris
  
  
   *T**opic Configuration*
  
   Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
  
   Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   *Producer Code:*
  
Properties props = new Properties();
  
   props.put(metadata.broker.list, args[0]);
  
   props.put(zk.connect, args[1]);
  
   props.put(serializer.class,
 kafka.serializer.StringEncoder);
  
   props.put(request.required.acks, 1);
  
   String TOPIC = args[2];
  
   ProducerConfig config = new ProducerConfig(props);
  
   ProducerString, String producer = new ProducerString,
 String(
   config);
  
   finalEvent = new Timestamp(new Date().getTime()) + |
  
   + truckIds[0] + | + driverIds[0] + | +
   events[random
   .nextInt(evtCnt

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote:

 Hi All,

 I am trying to get a multi threaded HL consumer working against a 2 broker
 Kafka cluster with a 4 partition 2 replica  topic.

 The consumer code is set to run with 4 threads, one for each partition.

 The producer code uses the default partitioner and loops indefinitely
 feeding events into the topic.(I excluded the while loop in the paste
 below)

 What I see is the threads eventually all exit, even thought the producer is
 still sending events into the topic.

 My understanding is that the consumer thread per partition is the correct
 setup.

 Any ideas why this code doesn't continue to consume events at they are
 pushed to the topic?

 I suspect I am configuring something wrong here, but am not sure what.

 Thanks,

 Chris


 *T**opic Configuration*

 Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

 Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

 *Producer Code:*

  Properties props = new Properties();

 props.put(metadata.broker.list, args[0]);

 props.put(zk.connect, args[1]);

 props.put(serializer.class, kafka.serializer.StringEncoder);

 props.put(request.required.acks, 1);

 String TOPIC = args[2];

 ProducerConfig config = new ProducerConfig(props);

 ProducerString, String producer = new ProducerString, String(
 config);

 finalEvent = new Timestamp(new Date().getTime()) + |

 + truckIds[0] + | + driverIds[0] + | +
 events[random
 .nextInt(evtCnt)]

 + | + getLatLong(arrayroute17[i]);

 try {

 KeyedMessageString, String data = new
 KeyedMessageString, String(TOPIC, finalEvent);

 LOG.info(Sending Messge #:  + routeName[0] + :  + i +,
 msg: + finalEvent);

 producer.send(data);

 Thread.sleep(1000);

 } catch (Exception e) {

 e.printStackTrace();

 }


 *Consumer Code:*

 public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run() {

ConsumerIteratorbyte[], byte[] it = m_stream.iterator();

while (it.hasNext()){

System.out.println(Thread  + m_threadNumber + :  + new
 String(it.next().message()));

try {

  Thread.sleep(1000);

 }catch (InterruptedException e) {

  e.printStackTrace();

  }

}

System.out.println(Shutting down Thread:  + m_threadNumber);

}

 }

 public class ConsumerGroupExample {

 private final ConsumerConnector consumer;

 private final String topic;

 private  ExecutorService executor;



 public ConsumerGroupExample(String a_zookeeper, String a_groupId,
 String a_topic) {

 consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

 createConsumerConfig(a_zookeeper, a_groupId));

 this.topic = a_topic;

 }



 public void shutdown() {

 if (consumer != null) consumer.shutdown();

 if (executor != null) executor.shutdown();

 try {

 if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

 System.out.println(Timed out waiting for consumer threads
 to shut down, exiting uncleanly);

 }

 } catch (InterruptedException e) {

 System.out.println(Interrupted during shutdown, exiting
 uncleanly);

 }

}



 public void run(int a_numThreads) {

 MapString, Integer topicCountMap = new HashMapString,
 Integer();

 topicCountMap.put(topic, new Integer(a_numThreads));

 MapString, ListKafkaStreambyte[], byte[] consumerMap =
 consumer.createMessageStreams(topicCountMap);

 ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic);

 executor = Executors.newFixedThreadPool(a_numThreads);

 int threadNumber = 0;

 for (final KafkaStream stream : streams) {

 executor.submit(new ConsumerTest(stream, threadNumber));

 threadNumber++;

 }

 }



 private static ConsumerConfig createConsumerConfig(String a_zookeeper,
 String a_groupId) {

 Properties props = new Properties();

 props.put(zookeeper.connect, a_zookeeper);

 props.put(group.id, a_groupId);

 

Getting java.lang.IllegalMonitorStateException in mirror maker when building fetch request

2015-04-24 Thread tao xiao
Hi team,

I observed java.lang.IllegalMonitorStateException thrown
from AbstractFetcherThread in mirror maker when it is trying to build the
fetchrequst. Below is the error

[2015-04-23 16:16:02,049] ERROR
[ConsumerFetcherThread-group_id_localhost-1429830778627-4519368f-0-7],
Error due to  (kafka.consumer.ConsumerFetcherThread)

java.lang.IllegalMonitorStateException

at
java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166)

at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

I believe this is due to partitionMapCond.await(fetchBackOffMs,
TimeUnit.MILLISECONDS) being called while not lock is acquired.

below code should fix the issue

inLock(partitionMapLock) {
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}

Should I file a jira ticket and submit the patch?

I use the latest version of mirror maker in trunk.


-- 
Regards,
Tao


Re: Got NPE during partition rebalancing in high level consumer

2015-04-15 Thread tao xiao
Guozhang,

No, I don't think the patch of KAFKA-2056 would fix this problem. The NPE
is thrown at the line that is called before the fix executes. But I do
notice that the code in trunk did fix the issue by ensuring the size of map
returned from ctx.consumersForTopic is  0. So the code in trunk is safe.

On Wed, Apr 15, 2015 at 3:45 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Tao,

 Do you think the solution to KAFKA-2056 will resolve this issue? It will be
 included in 0.8.3 release.

 Guozhang

 On Wed, Apr 15, 2015 at 2:21 PM, tao xiao xiaotao...@gmail.com wrote:

  Hi team,
 
  I discovered an issue that when a high level consumer with roundrobin
  assignment strategy consumes a topic that hasn't been created on broker a
  NPE exception is thrown during partition rebalancing phase. I use Kafka
  0.8.2.1
 
  Here is the step to reproduce:
 
  1. create a high level consumer with roundrobin
  2. use connector.createMessageStreamsByFilter to create a message stream
 in
  the consumer to a topic that yet to be created on broker
 
  Below is the exception.
 
  2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
  [test12345667f_localhost], Committing all offsets after clearing the
  fetcher queues
  2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
  [test12345667f_localhost], Releasing partition ownership
  2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:76 -
  [test12345667f_localhost], exception during rebalance
  java.lang.NullPointerException
  at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210)
  at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.IterableLike$class.head(IterableLike.scala:91)
  at scala.collection.AbstractIterable.head(Iterable.scala:54)
  at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75)
  at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:939)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160)
  at
 
 
 kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
  at
 
 
 kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105)
  at com.ebay.kafka.demo.Consumer.main(Consumer.java:61)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
 
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




-- 
Regards,
Tao


Got NPE during partition rebalancing in high level consumer

2015-04-15 Thread tao xiao
Hi team,

I discovered an issue that when a high level consumer with roundrobin
assignment strategy consumes a topic that hasn't been created on broker a
NPE exception is thrown during partition rebalancing phase. I use Kafka
0.8.2.1

Here is the step to reproduce:

1. create a high level consumer with roundrobin
2. use connector.createMessageStreamsByFilter to create a message stream in
the consumer to a topic that yet to be created on broker

Below is the exception.

2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
[test12345667f_localhost], Committing all offsets after clearing the
fetcher queues
2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:68 -
[test12345667f_localhost], Releasing partition ownership
2015-04-15 14:16:46 INFO  kafka.utils.Logging$class:76 -
[test12345667f_localhost], exception during rebalance
java.lang.NullPointerException
at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210)
at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75)
at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905)
at
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:939)
at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105)
at com.ebay.kafka.demo.Consumer.main(Consumer.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

-- 
Regards,
Tao


Re: Kafka server relocation

2015-04-13 Thread tao xiao
num.consumer.fetchers means the max number of fetcher threads that can be
spawned. it doesn't necessarily mean you can get as many fetcher threads as
you specify.

To me the metrics are suggesting a very slow consumption rate only 18.21
bytes/minute. Here is the benchmark Linkedin does

http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

You should check if 18.21 bytes/minute is the max throughput you can get on
your machine with bin/kafka-consumer-perf-test.sh if this is case you
definitely need to tune it

On Mon, Apr 13, 2015 at 12:43 PM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 hi Xiao,

 i have finally got JMX monitoring enabled for my kafka nodes in test
 envrionment and here is what i observed.
 i was monitoring mbeans under kafka.consumer domain of JVM running Kafka
 Mirror Maker process.

 =
 AllTopicsBytes === 18.21 bytes/minute
 FetchRequestRateAndTimeMs === 9.69 Request/min and  99th Percentile is
 104.13ms.
 ==

 Interesting thing is that I have specified num.consumer.fetchers=200 in
 my consumer property file but i can see only 8 threads of type :


 kafka.consumer:name=KafkaMaker1-ConsumerFetcherThread-KafkaMaker1_zkhost-1428952277321-5e044226-138-1-host_brokerhostname-port_9092-FetchRequestRateAndTimeMs,type=FetchRequestAndResponseMetrics


 Could this be the issue?

 note, my JVM is set to 1GB and only 30MB is utilized most of the time.


 Regards,
 Nitin Kumar Sharma.


 On Wed, Apr 8, 2015 at 10:48 PM, tao xiao xiaotao...@gmail.com wrote:

  Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to check
 if
  the consumer has problem processing messages
 
  On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  wrote:
 
   thanks, but can you please tell which metrics could highlight the
 factor
   causing slow data migration by MirrorMaker?
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
You may need to look into the consumer metrics and producer metrics
 to
identify the root cause. metrics in kafka.consumer and kafka.producer
categories will help you find out the problems.
   
This link gives instruction how to read the metrics
http://kafka.apache.org/documentation.html#monitoring
   
   
On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma 
   kumarsharma.ni...@gmail.com
wrote:
   
 hi,

 sorry for late response. ... i have been able to fix the issue ..
   problem
 was in my approach. I got confused between my source and target
  system
 while defining consumer  producer property file .. it is fixed now

 Now new issue.. the rate at which data is migrated is very very
   slow... i
 mean it took 5 min to copy only 15Kb.. :( .. here are the property
  for
 producer and consumer.. there is no network latency between Source
  and
 Destination clusters as such.


  Producer ###
 metadata.broker.list=broker1IP:9092,broker2IP:9092
 serializer.class=kafka.serializer.DefaultEncoder
 auto.create.topics.enable=true
 request.required.acks=1
 request.required.acks=1
 producer.type=async
 batch.num.messages=3000
 queue.buffering.max.ms=5000
 queue.buffering.max.messages=10
 queue.enqueue.timeout.ms=-1
 socket.send.buffer.bytes=5282880

 # Consumer ###

   
  
 
 zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181
 group.id=KafkaMaker
 auto.create.topics.enable=true
 socket.receive.buffer.bytes=5243880
 zookeeper.connection.timeout.ms=100
 num.consumer.fetchers=20
 fetch.message.max.bytes=5243880




 Regards,
 Nitin Kumar Sharma.


 On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com
   wrote:

  Can you attach your mirror maker log?
 
  On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  
  wrote:
 
   i tried with auto.offset.reset=smallest, but still not
 working..
  
   there is data in my source cluster
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Mon, Mar 30, 2015 at 10:30 PM, tao xiao 
 xiaotao...@gmail.com
  
 wrote:
  
Do you have data sending to *testtopic? *By default mirror
  maker
only
consumes data being sent after it taps into the topic. you
 need
   to
 keep
sending data to the topic after mirror maker connection is
 established.
   If
you want to change the behavior you can set
 auto.offset.reset=smallest
  so
that any new mirror maker coming to the topic will start from
  the
   smallest
offset
   
On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma 
   kumarsharma.ni...@gmail.com

wrote:
   
 thanks

Re: Kafka server relocation

2015-04-13 Thread tao xiao
how about the consumer lag of mirror maker?

On Mon, Apr 13, 2015 at 1:33 PM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 i just tested that too and below is the stats.. it is clear that with
 kafka-consumer-perf-test.sh, i am able to get a high throughput. around
 44.0213 MB/sec.

 Seriously some configuration needs to be tweaked in MirrorMaker
 configuration for speedy processing... Can you think of something ?


 start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec,
 data.consumed.in.nMsg, nMsg.sec
 2015-04-13 20:29:29:070, 2015-04-13 20:29:34:596, 1048576, 23.1552,
 44.0213, 16683, 31716.7300

 Regards,
 Nitin Kumar Sharma.


 On Mon, Apr 13, 2015 at 3:51 PM, tao xiao xiaotao...@gmail.com wrote:

  num.consumer.fetchers means the max number of fetcher threads that can be
  spawned. it doesn't necessarily mean you can get as many fetcher threads
 as
  you specify.
 
  To me the metrics are suggesting a very slow consumption rate only 18.21
  bytes/minute. Here is the benchmark Linkedin does
 
 
 
 http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
 
  You should check if 18.21 bytes/minute is the max throughput you can get
 on
  your machine with bin/kafka-consumer-perf-test.sh if this is case you
  definitely need to tune it
 
  On Mon, Apr 13, 2015 at 12:43 PM, nitin sharma 
  kumarsharma.ni...@gmail.com
  wrote:
 
   hi Xiao,
  
   i have finally got JMX monitoring enabled for my kafka nodes in test
   envrionment and here is what i observed.
   i was monitoring mbeans under kafka.consumer domain of JVM running
  Kafka
   Mirror Maker process.
  
   =
   AllTopicsBytes === 18.21 bytes/minute
   FetchRequestRateAndTimeMs === 9.69 Request/min and  99th Percentile is
   104.13ms.
   ==
  
   Interesting thing is that I have specified num.consumer.fetchers=200
 in
   my consumer property file but i can see only 8 threads of type :
  
  
  
 
 kafka.consumer:name=KafkaMaker1-ConsumerFetcherThread-KafkaMaker1_zkhost-1428952277321-5e044226-138-1-host_brokerhostname-port_9092-FetchRequestRateAndTimeMs,type=FetchRequestAndResponseMetrics
  
  
   Could this be the issue?
  
   note, my JVM is set to 1GB and only 30MB is utilized most of the time.
  
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Wed, Apr 8, 2015 at 10:48 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to
  check
   if
the consumer has problem processing messages
   
On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma 
   kumarsharma.ni...@gmail.com
wrote:
   
 thanks, but can you please tell which metrics could highlight the
   factor
 causing slow data migration by MirrorMaker?

 Regards,
 Nitin Kumar Sharma.


 On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com
   wrote:

  You may need to look into the consumer metrics and producer
 metrics
   to
  identify the root cause. metrics in kafka.consumer and
  kafka.producer
  categories will help you find out the problems.
 
  This link gives instruction how to read the metrics
  http://kafka.apache.org/documentation.html#monitoring
 
 
  On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  wrote:
 
   hi,
  
   sorry for late response. ... i have been able to fix the issue
 ..
 problem
   was in my approach. I got confused between my source and target
system
   while defining consumer  producer property file .. it is fixed
  now
  
   Now new issue.. the rate at which data is migrated is very very
 slow... i
   mean it took 5 min to copy only 15Kb.. :( .. here are the
  property
for
   producer and consumer.. there is no network latency between
  Source
and
   Destination clusters as such.
  
  
    Producer ###
   metadata.broker.list=broker1IP:9092,broker2IP:9092
   serializer.class=kafka.serializer.DefaultEncoder
   auto.create.topics.enable=true
   request.required.acks=1
   request.required.acks=1
   producer.type=async
   batch.num.messages=3000
   queue.buffering.max.ms=5000
   queue.buffering.max.messages=10
   queue.enqueue.timeout.ms=-1
   socket.send.buffer.bytes=5282880
  
   # Consumer ###
  
 

   
  
 
 zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181
   group.id=KafkaMaker
   auto.create.topics.enable=true
   socket.receive.buffer.bytes=5243880
   zookeeper.connection.timeout.ms=100
   num.consumer.fetchers=20
   fetch.message.max.bytes=5243880
  
  
  
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Tue, Mar 31, 2015 at 12:36 PM, tao xiao 
 xiaotao...@gmail.com
  
 wrote:
  
Can you attach your mirror maker

Re: Kafka server relocation

2015-04-08 Thread tao xiao
Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to check if
the consumer has problem processing messages

On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 thanks, but can you please tell which metrics could highlight the factor
 causing slow data migration by MirrorMaker?

 Regards,
 Nitin Kumar Sharma.


 On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com wrote:

  You may need to look into the consumer metrics and producer metrics to
  identify the root cause. metrics in kafka.consumer and kafka.producer
  categories will help you find out the problems.
 
  This link gives instruction how to read the metrics
  http://kafka.apache.org/documentation.html#monitoring
 
 
  On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  wrote:
 
   hi,
  
   sorry for late response. ... i have been able to fix the issue ..
 problem
   was in my approach. I got confused between my source and target system
   while defining consumer  producer property file .. it is fixed now
  
   Now new issue.. the rate at which data is migrated is very very
 slow... i
   mean it took 5 min to copy only 15Kb.. :( .. here are the property for
   producer and consumer.. there is no network latency between Source and
   Destination clusters as such.
  
  
    Producer ###
   metadata.broker.list=broker1IP:9092,broker2IP:9092
   serializer.class=kafka.serializer.DefaultEncoder
   auto.create.topics.enable=true
   request.required.acks=1
   request.required.acks=1
   producer.type=async
   batch.num.messages=3000
   queue.buffering.max.ms=5000
   queue.buffering.max.messages=10
   queue.enqueue.timeout.ms=-1
   socket.send.buffer.bytes=5282880
  
   # Consumer ###
  
 
 zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181
   group.id=KafkaMaker
   auto.create.topics.enable=true
   socket.receive.buffer.bytes=5243880
   zookeeper.connection.timeout.ms=100
   num.consumer.fetchers=20
   fetch.message.max.bytes=5243880
  
  
  
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Can you attach your mirror maker log?
   
On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma 
   kumarsharma.ni...@gmail.com

wrote:
   
 i tried with auto.offset.reset=smallest, but still not working..

 there is data in my source cluster

 Regards,
 Nitin Kumar Sharma.


 On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com
   wrote:

  Do you have data sending to *testtopic? *By default mirror maker
  only
  consumes data being sent after it taps into the topic. you need
 to
   keep
  sending data to the topic after mirror maker connection is
   established.
 If
  you want to change the behavior you can set
   auto.offset.reset=smallest
so
  that any new mirror maker coming to the topic will start from the
 smallest
  offset
 
  On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  
  wrote:
 
   thanks Xiao
  
   I tried MirrorMaker option in my test environment but failed. I
  am
not
   able to see the log getting copied to destination cluster. I
 see
  in
the
  log
   of MirrorMaker process that connection is successfully
  established
  between
   source and destination cluster but still not sure what i
 causing
   the
  problem
  
   Env. Setup ==
  
   I). Source Cluster (Qenv02) -- i have 2
   broker(Qenv02kf01,Qenv02kf02)
 and
   3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03).
   Destination Clustern (Qenv05) -- i have 2 broker
 (Qenv05kf01,Qenv05kf02)
   and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03).
  
   II). i have kept consumer and producer properties file in one
 of
   the
   source kafka broker config folder.
  
   III).i have executed following command from the same kafka
 broker
   to
   start the process.. log are attached :
  
   /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker
  -consumer.config
   /app/kafka/config/consumer1.properties --num.streams=2
 --producer.config
   /app/kafka/config/producer1.properties --whitelist testtopic
  
  
   IV). I tried Consumer offset tracker tool also, while Mirror
  Maker
  running
   . I tried by launching second session of same broker where
 mirror
maker
  is
   running. I got error message that *NoNode for
   /consumers/KafkaMaker/offsets/testtopic/0* .Complete log
  attached.
  
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Thu, Mar 26, 2015 at 11:24 AM, tao xiao 
 xiaotao...@gmail.com
  
 wrote:
  
   Both consumer-1 and consumer-2 are properties of source
 clusters
 mirror
   maker transfers data from. Mirror maker is designed to be able

Re: Kafka server relocation

2015-04-07 Thread tao xiao
You may need to look into the consumer metrics and producer metrics to
identify the root cause. metrics in kafka.consumer and kafka.producer
categories will help you find out the problems.

This link gives instruction how to read the metrics
http://kafka.apache.org/documentation.html#monitoring


On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 hi,

 sorry for late response. ... i have been able to fix the issue .. problem
 was in my approach. I got confused between my source and target system
 while defining consumer  producer property file .. it is fixed now

 Now new issue.. the rate at which data is migrated is very very slow... i
 mean it took 5 min to copy only 15Kb.. :( .. here are the property for
 producer and consumer.. there is no network latency between Source and
 Destination clusters as such.


  Producer ###
 metadata.broker.list=broker1IP:9092,broker2IP:9092
 serializer.class=kafka.serializer.DefaultEncoder
 auto.create.topics.enable=true
 request.required.acks=1
 request.required.acks=1
 producer.type=async
 batch.num.messages=3000
 queue.buffering.max.ms=5000
 queue.buffering.max.messages=10
 queue.enqueue.timeout.ms=-1
 socket.send.buffer.bytes=5282880

 # Consumer ###
 zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181
 group.id=KafkaMaker
 auto.create.topics.enable=true
 socket.receive.buffer.bytes=5243880
 zookeeper.connection.timeout.ms=100
 num.consumer.fetchers=20
 fetch.message.max.bytes=5243880




 Regards,
 Nitin Kumar Sharma.


 On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com wrote:

  Can you attach your mirror maker log?
 
  On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  
  wrote:
 
   i tried with auto.offset.reset=smallest, but still not working..
  
   there is data in my source cluster
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Do you have data sending to *testtopic? *By default mirror maker only
consumes data being sent after it taps into the topic. you need to
 keep
sending data to the topic after mirror maker connection is
 established.
   If
you want to change the behavior you can set
 auto.offset.reset=smallest
  so
that any new mirror maker coming to the topic will start from the
   smallest
offset
   
On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma 
   kumarsharma.ni...@gmail.com

wrote:
   
 thanks Xiao

 I tried MirrorMaker option in my test environment but failed. I am
  not
 able to see the log getting copied to destination cluster. I see in
  the
log
 of MirrorMaker process that connection is successfully established
between
 source and destination cluster but still not sure what i causing
 the
problem

 Env. Setup ==

 I). Source Cluster (Qenv02) -- i have 2
 broker(Qenv02kf01,Qenv02kf02)
   and
 3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03).
 Destination Clustern (Qenv05) -- i have 2 broker
   (Qenv05kf01,Qenv05kf02)
 and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03).

 II). i have kept consumer and producer properties file in one of
 the
 source kafka broker config folder.

 III).i have executed following command from the same kafka broker
 to
 start the process.. log are attached :

 /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker
-consumer.config
 /app/kafka/config/consumer1.properties --num.streams=2
   --producer.config
 /app/kafka/config/producer1.properties --whitelist testtopic


 IV). I tried Consumer offset tracker tool also, while Mirror Maker
running
 . I tried by launching second session of same broker where mirror
  maker
is
 running. I got error message that *NoNode for
 /consumers/KafkaMaker/offsets/testtopic/0* .Complete log attached.


 Regards,
 Nitin Kumar Sharma.


 On Thu, Mar 26, 2015 at 11:24 AM, tao xiao xiaotao...@gmail.com
   wrote:

 Both consumer-1 and consumer-2 are properties of source clusters
   mirror
 maker transfers data from. Mirror maker is designed to be able to
consume
 data from N sources (N = 1) and transfer data to one destination
cluster.
 You are free to supply as many consumer properties as you want to
instruct
 mirror maker where to consumer data from.

 On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma 
 kumarsharma.ni...@gmail.com
 wrote:

  thanks Mayuresh and Jiangjie for your response.
 
  I have actually not understood Mirror maker clearly and hence
 bit
 skeptical
  if i will be able to execute it effectively.
 
  Online i hv seen the following command to be execute, but not
understood
   what is consumer-1  -2.properties here? do i need to copy from
  my
  consumer code? also, any reason why

Re: Kafka server relocation

2015-03-31 Thread tao xiao
Can you attach your mirror maker log?

On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 i tried with auto.offset.reset=smallest, but still not working..

 there is data in my source cluster

 Regards,
 Nitin Kumar Sharma.


 On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com wrote:

  Do you have data sending to *testtopic? *By default mirror maker only
  consumes data being sent after it taps into the topic. you need to keep
  sending data to the topic after mirror maker connection is established.
 If
  you want to change the behavior you can set auto.offset.reset=smallest so
  that any new mirror maker coming to the topic will start from the
 smallest
  offset
 
  On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma 
 kumarsharma.ni...@gmail.com
  
  wrote:
 
   thanks Xiao
  
   I tried MirrorMaker option in my test environment but failed. I am not
   able to see the log getting copied to destination cluster. I see in the
  log
   of MirrorMaker process that connection is successfully established
  between
   source and destination cluster but still not sure what i causing the
  problem
  
   Env. Setup ==
  
   I). Source Cluster (Qenv02) -- i have 2 broker(Qenv02kf01,Qenv02kf02)
 and
   3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03).
   Destination Clustern (Qenv05) -- i have 2 broker
 (Qenv05kf01,Qenv05kf02)
   and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03).
  
   II). i have kept consumer and producer properties file in one of the
   source kafka broker config folder.
  
   III).i have executed following command from the same kafka broker  to
   start the process.. log are attached :
  
   /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker
  -consumer.config
   /app/kafka/config/consumer1.properties --num.streams=2
 --producer.config
   /app/kafka/config/producer1.properties --whitelist testtopic
  
  
   IV). I tried Consumer offset tracker tool also, while Mirror Maker
  running
   . I tried by launching second session of same broker where mirror maker
  is
   running. I got error message that *NoNode for
   /consumers/KafkaMaker/offsets/testtopic/0* .Complete log attached.
  
  
   Regards,
   Nitin Kumar Sharma.
  
  
   On Thu, Mar 26, 2015 at 11:24 AM, tao xiao xiaotao...@gmail.com
 wrote:
  
   Both consumer-1 and consumer-2 are properties of source clusters
 mirror
   maker transfers data from. Mirror maker is designed to be able to
  consume
   data from N sources (N = 1) and transfer data to one destination
  cluster.
   You are free to supply as many consumer properties as you want to
  instruct
   mirror maker where to consumer data from.
  
   On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma 
   kumarsharma.ni...@gmail.com
   wrote:
  
thanks Mayuresh and Jiangjie for your response.
   
I have actually not understood Mirror maker clearly and hence bit
   skeptical
if i will be able to execute it effectively.
   
Online i hv seen the following command to be execute, but not
  understood
 what is consumer-1  -2.properties here? do i need to copy from my
consumer code? also, any reason why to provide consumer property?
   
bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
consumer-1.properties --consumer.config consumer-2.properties
--producer.config producer.properties --whitelist my-topic
   
   
Regards,
Nitin Kumar Sharma.
   
   
On Wed, Mar 25, 2015 at 8:57 PM, Mayuresh Gharat 
gharatmayures...@gmail.com
 wrote:
   
 You can use the Mirror maker to move data from one data center to
   other
and
 once all the data has been moved you can shut down the source data
   center
 by doing a controlled shutdown.

 Thanks,

 Mayuresh

 On Wed, Mar 25, 2015 at 2:35 PM, Jiangjie Qin
   j...@linkedin.com.invalid

 wrote:

  If you want to do a seamless migration. I think a better way is
 to
build
 a
  cross datacenter Kafka cluster temporarily. So the process is:
  1. Add several new Kafka brokers in your new datacenter and add
   them to
  the old cluster.
  2. Use replica assignment tool to reassign all the partitions to
brokers
  in new datacenter.
  3. Perform controlled shutdown on the brokers in old datacenter.
 
  Jiangjie (Becket) Qin
 
  On 3/25/15, 2:01 PM, nitin sharma 
 kumarsharma.ni...@gmail.com
wrote:
 
  Hi Team,
  
  in my project, we have built a new datacenter for Kafka brokers
  and
 wants
  to migrate from current datacenter to new one.
  
  Switching producers and consumers wont be a problem provided
 New
  Datacenter
  has all the messages of existing Datacenter.
  
  
  i have only 1 topic with 2 partition that need to be
 migrated...
   that
 too
  it is only 1 time activity.
  
  Kindly suggest the best way to deal with this situation.
  
  
  Regards,
  Nitin Kumar

Re: Kafka server relocation

2015-03-26 Thread tao xiao
Both consumer-1 and consumer-2 are properties of source clusters mirror
maker transfers data from. Mirror maker is designed to be able to consume
data from N sources (N = 1) and transfer data to one destination cluster.
You are free to supply as many consumer properties as you want to instruct
mirror maker where to consumer data from.

On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 thanks Mayuresh and Jiangjie for your response.

 I have actually not understood Mirror maker clearly and hence bit skeptical
 if i will be able to execute it effectively.

 Online i hv seen the following command to be execute, but not understood
  what is consumer-1  -2.properties here? do i need to copy from my
 consumer code? also, any reason why to provide consumer property?

 bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
 consumer-1.properties --consumer.config consumer-2.properties
 --producer.config producer.properties --whitelist my-topic


 Regards,
 Nitin Kumar Sharma.


 On Wed, Mar 25, 2015 at 8:57 PM, Mayuresh Gharat 
 gharatmayures...@gmail.com
  wrote:

  You can use the Mirror maker to move data from one data center to other
 and
  once all the data has been moved you can shut down the source data center
  by doing a controlled shutdown.
 
  Thanks,
 
  Mayuresh
 
  On Wed, Mar 25, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   If you want to do a seamless migration. I think a better way is to
 build
  a
   cross datacenter Kafka cluster temporarily. So the process is:
   1. Add several new Kafka brokers in your new datacenter and add them to
   the old cluster.
   2. Use replica assignment tool to reassign all the partitions to
 brokers
   in new datacenter.
   3. Perform controlled shutdown on the brokers in old datacenter.
  
   Jiangjie (Becket) Qin
  
   On 3/25/15, 2:01 PM, nitin sharma kumarsharma.ni...@gmail.com
 wrote:
  
   Hi Team,
   
   in my project, we have built a new datacenter for Kafka brokers and
  wants
   to migrate from current datacenter to new one.
   
   Switching producers and consumers wont be a problem provided New
   Datacenter
   has all the messages of existing Datacenter.
   
   
   i have only 1 topic with 2 partition that need to be migrated... that
  too
   it is only 1 time activity.
   
   Kindly suggest the best way to deal with this situation.
   
   
   Regards,
   Nitin Kumar Sharma.
  
  
 
 
  --
  -Regards,
  Mayuresh R. Gharat
  (862) 250-7125
 




-- 
Regards,
Tao


Re: lost messages -?

2015-03-25 Thread tao xiao
You can use kafka-console-consumer consuming the topic from the beginning

*kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
--from-beginning*


On Thu, Mar 26, 2015 at 12:17 AM, Victor L vlyamt...@gmail.com wrote:

 Can someone let me know how to dump contents of topics?
 I have producers sending messages to 3 brokers but about half of them don't
 seem to be consumed. I suppose they are getting stuck in queues but how can
 i figure out where?
 Thks,




-- 
Regards,
Tao


Re: Mirror maker fetcher thread unexpectedly stopped

2015-03-24 Thread tao xiao
Hi community,

I wanted to know if the solution I supplied can fix the
IllegalMonitorStateException
issue. Our work is pending on this and we'd like to proceed ASAP. Sorry for
bothering.

On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote:

 I think I worked out the answer to question 1. 
 java.lang.IllegalMonitorStateException
 was thrown due to no ownership of ReentrantLock when trying to call await()
 on the lock condition.

 Here is the code snippet from the AbstractFetcherThread.scala in trunk

 partitionMapLock synchronized {
 partitionsWithError ++= partitionMap.keys
 // there is an error occurred while fetching partitions, sleep
 a while
 partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
 }

 as shown above partitionMapLock is not acquired before calling
 partitionMapCond.await

 we can fix this by explicitly calling partitionMapLock.lock(). below code
 block should work

 inLock(partitionMapLock) {
 partitionsWithError ++= partitionMap.keys
 // there is an error occurred while fetching partitions, sleep
 a while
 partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
 }

 On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi,

 I was running a mirror maker and got
  java.lang.IllegalMonitorStateException that caused the underlying fetcher
 thread completely stopped. Here is the log from mirror maker.

 [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
 java.io.EOFException: Received -1 when reading from channel, socket has
 likely been closed. (kafka.consumer.SimpleConsumer)
 [2015-03-21 02:11:53,081] WARN
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
 fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
 phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
 RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
 Possible cause: java.nio.channels.ClosedChannelException
 (kafka.consumer.ConsumerFetcherThread)
 [2015-03-21 02:11:53,083] ERROR
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to
  (kafka.consumer.ConsumerFetcherThread)
 java.lang.IllegalMonitorStateException
 at
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 [2015-03-21 02:11:53,083] INFO
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
  (kafka.consumer.ConsumerFetcherThread)

 I am still investigating what caused the connection error on server side
 but I have a couple of questions related to mirror maker itself

 1. What is root cause of java.lang.IllegalMonitorStateException? As shown
 in the AbstractFetcherThread source the fetcher thread should catch the
 java.io.EOFException thrown from underlying simplyConsumer and sleep a
 while before next run.
 2. Mirror maker is unaware of the termination of fetcher thread. That
 makes it unable to detect the failure and trigger rebalancing. I have 3
 mirror maker instances running in 3 different machines listening to the
 same topic. I would expect the mirror maker will release the partition
 ownership when underlying fetcher thread terminates so that rebalancing can
 be triggered.but in fact this is not the case. is this expected behavior or
 do I miss configure anything?

 I am running the trunk version as of commit
 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d

 --
 Regards,
 Tao




 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Mirror maker fetcher thread unexpectedly stopped

2015-03-24 Thread tao xiao
Thank you for the explanation.

Patch submitted https://issues.apache.org/jira/browse/KAFKA-2048

On Wed, Mar 25, 2015 at 8:29 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It should be another ticket. This is a AbstractFetcherThread issue rather
 than a mirror maker issue.

 I kind of think this case you saw was a special case as itÂčs not actually
 a runtime error but a coding bug. Fetcher thread should not die by design.
 So I donÂčt think we have a way to restart fetchers without code change if
 they die accidentally. One way to do this is to add liveness check in
 LeaderFinderThread. But I donÂčt know if this is a necessary change just
 because of the case you saw.

 Jiangjie (Becket) Qin

 On 3/24/15, 5:05 PM, tao xiao xiaotao...@gmail.com wrote:

 The other question I have is the fact that consumer client is unaware of
 the health status of underlying fetcher thread. If the fetcher thread dies
 like the case I encountered is there a way that consumer can restart the
 fetcher thread or release ownership of partitions so that other consumers
 can pick them up while fetcher thread is down.
 
 On Wed, Mar 25, 2015 at 8:00 AM, tao xiao xiaotao...@gmail.com wrote:
 
  Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?
 
  On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
 
  Hi Xiao,
 
  I think the fix for IllegalStateExcepetion is correct.
  Can you also create a ticket and submit a patch?
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi community,
  
  I wanted to know if the solution I supplied can fix the
  IllegalMonitorStateException
  issue. Our work is pending on this and we'd like to proceed ASAP.
 Sorry
  for
  bothering.
  
  On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
   I think I worked out the answer to question 1.
  java.lang.IllegalMonitorStateException
   was thrown due to no ownership of ReentrantLock when trying to call
  await()
   on the lock condition.
  
   Here is the code snippet from the AbstractFetcherThread.scala in
 trunk
  
   partitionMapLock synchronized {
   partitionsWithError ++= partitionMap.keys
   // there is an error occurred while fetching partitions,
  sleep
   a while
   partitionMapCond.await(fetchBackOffMs,
  TimeUnit.MILLISECONDS)
   }
  
   as shown above partitionMapLock is not acquired before calling
   partitionMapCond.await
  
   we can fix this by explicitly calling partitionMapLock.lock(). below
  code
   block should work
  
   inLock(partitionMapLock) {
   partitionsWithError ++= partitionMap.keys
   // there is an error occurred while fetching partitions,
  sleep
   a while
   partitionMapCond.await(fetchBackOffMs,
  TimeUnit.MILLISECONDS)
   }
  
   On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com
  wrote:
  
   Hi,
  
   I was running a mirror maker and got
java.lang.IllegalMonitorStateException that caused the underlying
  fetcher
   thread completely stopped. Here is the log from mirror maker.
  
   [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
   java.io.EOFException: Received -1 when reading from channel, socket
  has
   likely been closed. (kafka.consumer.SimpleConsumer)
   [2015-03-21 02:11:53,081] WARN
   [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
 Error in
   fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588;
  ClientId:
   phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1
 bytes;
   RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
   Possible cause: java.nio.channels.ClosedChannelException
   (kafka.consumer.ConsumerFetcherThread)
   [2015-03-21 02:11:53,083] ERROR
   [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
  due to
(kafka.consumer.ConsumerFetcherThread)
   java.lang.IllegalMonitorStateException
   at
  
 
 
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc
 k.j
  ava:155)
   at
  
 
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac
 tQu
  euedSynchronizer.java:1260)
   at
  
 
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab
 str
  actQueuedSynchronizer.java:1723)
   at
  
 
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject
 .aw
  ait(AbstractQueuedSynchronizer.java:2166)
   at
  
 
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche
 rTh
  read.scala:106)
   at
  
 
 
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala
 :90
  )
   at
  kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
   [2015-03-21 02:11:53,083] INFO
   [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6],
 Stopped
(kafka.consumer.ConsumerFetcherThread)
  
   I am still investigating what caused the connection error

Re: Mirror maker fetcher thread unexpectedly stopped

2015-03-24 Thread tao xiao
Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?

On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Xiao,

 I think the fix for IllegalStateExcepetion is correct.
 Can you also create a ticket and submit a patch?

 Thanks.

 Jiangjie (Becket) Qin

 On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi community,
 
 I wanted to know if the solution I supplied can fix the
 IllegalMonitorStateException
 issue. Our work is pending on this and we'd like to proceed ASAP. Sorry
 for
 bothering.
 
 On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote:
 
  I think I worked out the answer to question 1.
 java.lang.IllegalMonitorStateException
  was thrown due to no ownership of ReentrantLock when trying to call
 await()
  on the lock condition.
 
  Here is the code snippet from the AbstractFetcherThread.scala in trunk
 
  partitionMapLock synchronized {
  partitionsWithError ++= partitionMap.keys
  // there is an error occurred while fetching partitions,
 sleep
  a while
  partitionMapCond.await(fetchBackOffMs,
 TimeUnit.MILLISECONDS)
  }
 
  as shown above partitionMapLock is not acquired before calling
  partitionMapCond.await
 
  we can fix this by explicitly calling partitionMapLock.lock(). below
 code
  block should work
 
  inLock(partitionMapLock) {
  partitionsWithError ++= partitionMap.keys
  // there is an error occurred while fetching partitions,
 sleep
  a while
  partitionMapCond.await(fetchBackOffMs,
 TimeUnit.MILLISECONDS)
  }
 
  On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi,
 
  I was running a mirror maker and got
   java.lang.IllegalMonitorStateException that caused the underlying
 fetcher
  thread completely stopped. Here is the log from mirror maker.
 
  [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
  java.io.EOFException: Received -1 when reading from channel, socket has
  likely been closed. (kafka.consumer.SimpleConsumer)
  [2015-03-21 02:11:53,081] WARN
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
  fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
  phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
  RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
  Possible cause: java.nio.channels.ClosedChannelException
  (kafka.consumer.ConsumerFetcherThread)
  [2015-03-21 02:11:53,083] ERROR
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
 due to
   (kafka.consumer.ConsumerFetcherThread)
  java.lang.IllegalMonitorStateException
  at
 
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j
 ava:155)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu
 euedSynchronizer.java:1260)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr
 actQueuedSynchronizer.java:1723)
  at
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
 ait(AbstractQueuedSynchronizer.java:2166)
  at
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
 read.scala:106)
  at
 
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90
 )
  at
 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
  [2015-03-21 02:11:53,083] INFO
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
   (kafka.consumer.ConsumerFetcherThread)
 
  I am still investigating what caused the connection error on server
 side
  but I have a couple of questions related to mirror maker itself
 
  1. What is root cause of java.lang.IllegalMonitorStateException? As
 shown
  in the AbstractFetcherThread source the fetcher thread should catch the
  java.io.EOFException thrown from underlying simplyConsumer and sleep a
  while before next run.
  2. Mirror maker is unaware of the termination of fetcher thread. That
  makes it unable to detect the failure and trigger rebalancing. I have 3
  mirror maker instances running in 3 different machines listening to the
  same topic. I would expect the mirror maker will release the partition
  ownership when underlying fetcher thread terminates so that
 rebalancing can
  be triggered.but in fact this is not the case. is this expected
 behavior or
  do I miss configure anything?
 
  I am running the trunk version as of commit
  82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
 
  --
  Regards,
  Tao
 
 
 
 
  --
  Regards,
  Tao
 
 
 
 
 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Mirror maker fetcher thread unexpectedly stopped

2015-03-24 Thread tao xiao
The other question I have is the fact that consumer client is unaware of
the health status of underlying fetcher thread. If the fetcher thread dies
like the case I encountered is there a way that consumer can restart the
fetcher thread or release ownership of partitions so that other consumers
can pick them up while fetcher thread is down.

On Wed, Mar 25, 2015 at 8:00 AM, tao xiao xiaotao...@gmail.com wrote:

 Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket?

 On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

 Hi Xiao,

 I think the fix for IllegalStateExcepetion is correct.
 Can you also create a ticket and submit a patch?

 Thanks.

 Jiangjie (Becket) Qin

 On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi community,
 
 I wanted to know if the solution I supplied can fix the
 IllegalMonitorStateException
 issue. Our work is pending on this and we'd like to proceed ASAP. Sorry
 for
 bothering.
 
 On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote:
 
  I think I worked out the answer to question 1.
 java.lang.IllegalMonitorStateException
  was thrown due to no ownership of ReentrantLock when trying to call
 await()
  on the lock condition.
 
  Here is the code snippet from the AbstractFetcherThread.scala in trunk
 
  partitionMapLock synchronized {
  partitionsWithError ++= partitionMap.keys
  // there is an error occurred while fetching partitions,
 sleep
  a while
  partitionMapCond.await(fetchBackOffMs,
 TimeUnit.MILLISECONDS)
  }
 
  as shown above partitionMapLock is not acquired before calling
  partitionMapCond.await
 
  we can fix this by explicitly calling partitionMapLock.lock(). below
 code
  block should work
 
  inLock(partitionMapLock) {
  partitionsWithError ++= partitionMap.keys
  // there is an error occurred while fetching partitions,
 sleep
  a while
  partitionMapCond.await(fetchBackOffMs,
 TimeUnit.MILLISECONDS)
  }
 
  On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com
 wrote:
 
  Hi,
 
  I was running a mirror maker and got
   java.lang.IllegalMonitorStateException that caused the underlying
 fetcher
  thread completely stopped. Here is the log from mirror maker.
 
  [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
  java.io.EOFException: Received -1 when reading from channel, socket
 has
  likely been closed. (kafka.consumer.SimpleConsumer)
  [2015-03-21 02:11:53,081] WARN
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
  fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588;
 ClientId:
  phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
  RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
  Possible cause: java.nio.channels.ClosedChannelException
  (kafka.consumer.ConsumerFetcherThread)
  [2015-03-21 02:11:53,083] ERROR
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error
 due to
   (kafka.consumer.ConsumerFetcherThread)
  java.lang.IllegalMonitorStateException
  at
 

 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j
 ava:155)
  at
 

 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu
 euedSynchronizer.java:1260)
  at
 

 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr
 actQueuedSynchronizer.java:1723)
  at
 

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw
 ait(AbstractQueuedSynchronizer.java:2166)
  at
 

 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh
 read.scala:106)
  at
 

 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90
 )
  at
 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
  [2015-03-21 02:11:53,083] INFO
  [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
   (kafka.consumer.ConsumerFetcherThread)
 
  I am still investigating what caused the connection error on server
 side
  but I have a couple of questions related to mirror maker itself
 
  1. What is root cause of java.lang.IllegalMonitorStateException? As
 shown
  in the AbstractFetcherThread source the fetcher thread should catch
 the
  java.io.EOFException thrown from underlying simplyConsumer and sleep a
  while before next run.
  2. Mirror maker is unaware of the termination of fetcher thread. That
  makes it unable to detect the failure and trigger rebalancing. I have
 3
  mirror maker instances running in 3 different machines listening to
 the
  same topic. I would expect the mirror maker will release the partition
  ownership when underlying fetcher thread terminates so that
 rebalancing can
  be triggered.but in fact this is not the case. is this expected
 behavior or
  do I miss configure anything?
 
  I am running the trunk version as of commit
  82789e75199fdc1cae115c5c2eadfd0f1ece4d0d
 
  --
  Regards

Re: Mirror maker fetcher thread unexpectedly stopped

2015-03-23 Thread tao xiao
I think I worked out the answer to question 1.
java.lang.IllegalMonitorStateException
was thrown due to no ownership of ReentrantLock when trying to call await()
on the lock condition.

Here is the code snippet from the AbstractFetcherThread.scala in trunk

partitionMapLock synchronized {
partitionsWithError ++= partitionMap.keys
// there is an error occurred while fetching partitions, sleep
a while
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}

as shown above partitionMapLock is not acquired before calling
partitionMapCond.await

we can fix this by explicitly calling partitionMapLock.lock(). below code
block should work

inLock(partitionMapLock) {
partitionsWithError ++= partitionMap.keys
// there is an error occurred while fetching partitions, sleep
a while
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}

On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi,

 I was running a mirror maker and got
  java.lang.IllegalMonitorStateException that caused the underlying fetcher
 thread completely stopped. Here is the log from mirror maker.

 [2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
 java.io.EOFException: Received -1 when reading from channel, socket has
 likely been closed. (kafka.consumer.SimpleConsumer)
 [2015-03-21 02:11:53,081] WARN
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
 fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
 phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
 RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
 Possible cause: java.nio.channels.ClosedChannelException
 (kafka.consumer.ConsumerFetcherThread)
 [2015-03-21 02:11:53,083] ERROR
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to
  (kafka.consumer.ConsumerFetcherThread)
 java.lang.IllegalMonitorStateException
 at
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 [2015-03-21 02:11:53,083] INFO
 [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
  (kafka.consumer.ConsumerFetcherThread)

 I am still investigating what caused the connection error on server side
 but I have a couple of questions related to mirror maker itself

 1. What is root cause of java.lang.IllegalMonitorStateException? As shown
 in the AbstractFetcherThread source the fetcher thread should catch the
 java.io.EOFException thrown from underlying simplyConsumer and sleep a
 while before next run.
 2. Mirror maker is unaware of the termination of fetcher thread. That
 makes it unable to detect the failure and trigger rebalancing. I have 3
 mirror maker instances running in 3 different machines listening to the
 same topic. I would expect the mirror maker will release the partition
 ownership when underlying fetcher thread terminates so that rebalancing can
 be triggered.but in fact this is not the case. is this expected behavior or
 do I miss configure anything?

 I am running the trunk version as of commit
 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d

 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: kafka audit

2015-03-23 Thread tao xiao
Linkedin has an excellent tool that monitors lag/data loss/data duplication
and etc. Here is the reference

http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015

it is not open sourced though.

On Mon, Mar 23, 2015 at 3:26 PM, sunil kalva kalva.ka...@gmail.com wrote:

 Hi
 What is best practice for adding audit feature in kafka, Is there any
 framework available for enabling audit feature at producer and consumer
 level and any UI frameworks for monitoring.

 tx
 SunilKalva




-- 
Regards,
Tao


Mirror maker fetcher thread unexpectedly stopped

2015-03-22 Thread tao xiao
Hi,

I was running a mirror maker and got
 java.lang.IllegalMonitorStateException that caused the underlying fetcher
thread completely stopped. Here is the log from mirror maker.

[2015-03-21 02:11:53,069] INFO Reconnect due to socket error:
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed. (kafka.consumer.SimpleConsumer)
[2015-03-21 02:11:53,081] WARN
[ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in
fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId:
phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes;
RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576).
Possible cause: java.nio.channels.ClosedChannelException
(kafka.consumer.ConsumerFetcherThread)
[2015-03-21 02:11:53,083] ERROR
[ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to
 (kafka.consumer.ConsumerFetcherThread)
java.lang.IllegalMonitorStateException
at
java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-03-21 02:11:53,083] INFO
[ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped
 (kafka.consumer.ConsumerFetcherThread)

I am still investigating what caused the connection error on server side
but I have a couple of questions related to mirror maker itself

1. What is root cause of java.lang.IllegalMonitorStateException? As shown
in the AbstractFetcherThread source the fetcher thread should catch the
java.io.EOFException thrown from underlying simplyConsumer and sleep a
while before next run.
2. Mirror maker is unaware of the termination of fetcher thread. That makes
it unable to detect the failure and trigger rebalancing. I have 3 mirror
maker instances running in 3 different machines listening to the same
topic. I would expect the mirror maker will release the partition ownership
when underlying fetcher thread terminates so that rebalancing can be
triggered.but in fact this is not the case. is this expected behavior or do
I miss configure anything?

I am running the trunk version as of commit
82789e75199fdc1cae115c5c2eadfd0f1ece4d0d

-- 
Regards,
Tao


Re: New Java Producer Client handling case where Kafka is unreachable

2015-03-20 Thread tao xiao
You can set producer property retries not equal to 0. Details can be found
here
http://kafka.apache.org/documentation.html#newproducerconfigs

On Fri, Mar 20, 2015 at 3:01 PM, Samuel Chase samebch...@gmail.com wrote:

 Hello Everyone,

 In the the new Java Producer API, the Callback code in
 KafkaProducer.send is run after there is a response from the Kafka
 server. This can be used if some error handling needs to be done based
 on the response.

 When using the new Java Kafka Producer, I've noticed that when the
 Kafka server is down/unreachable, KafkaProducer.send blocks until the
 Kafka server is back up again.

 We've been using the older Scala Producer and when Kafka is
 unreachable it throws an exception after a few retries. This exception
 is caught and then some error handling code is run.

 - What is the recommended way of using the new Java Producer API to
 handle the case where Kafka is unreachable temporarily?

 I don't want to wait until it is reachable again before I know that
 the send failed.

 Any help, advice shall be much appreciated.

 Thanks,

 Samuel




-- 
Regards,
Tao


Re: Post on running Kafka at LinkedIn

2015-03-20 Thread tao xiao
here is the slide

http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015

On Sat, Mar 21, 2015 at 2:36 AM, Xiao lixiao1...@gmail.com wrote:

 Hi, James,

 Thank you for sharing it!

 The links of videos and slides are the same. Could you check the link of
 slides?

 Xiao Li

 On Mar 20, 2015, at 11:30 AM, James Cheng jch...@tivo.com wrote:

  For those who missed it:
 
  The Kafka Audit tool was also presented at the 1/27 Kafka meetup:
  http://www.meetup.com/http-kafka-apache-org/events/219626780/
 
  Recorded video is here, starting around the 40 minute mark:
  http://www.ustream.tv/recorded/58109076
 
  Slides are here:
  http://www.ustream.tv/recorded/58109076
 
  -James
 
  On Mar 20, 2015, at 9:47 AM, Todd Palino tpal...@gmail.com wrote:
 
  For those who are interested in detail on how we've got Kafka set up at
  LinkedIn, I have just published a new posted to our Engineering blog
 titled
  Running Kafka at Scale
 
 https://engineering.linkedin.com/kafka/running-kafka-scale
 
  It's a general overview of our current Kafka install, tiered
 architecture,
  audit, and the libraries we use for producers and consumers. You'll
 also be
  seeing more posts from the SRE team here in the coming weeks on deeper
  looks into both Kafka and Samza.
 
  Additionally, I'll be giving a talk at ApacheCon next month on running
  tiered Kafka architectures. If you're in Austin for that, please come by
  and check it out.
 
  -Todd
 




-- 
Regards,
Tao


No topic owner when using different assignment strategies

2015-03-17 Thread tao xiao
Hi team,

I have two consumer instances with the same group id connecting to two
different topics with 1 partition created for each. One consumer uses
partition.assignment.strategy=roundrobin and the other one uses default
assignment strategy. Both consumers have 1 thread spawned internally and
connect kafka using createMessageStreamsByFilter.
The consumer with roundrobin assignment strategy connected kafka first and
had 2 topics assigned to itself and then I brought up another consumer that
has default assignment strategy configured. I saw rebalancing happened in
both consumers but at the end only one of the topics was assigned to the
consumer that is configured roundrobin assignment strategy and no topics
were assigned to the other consumer. This led to one topic missing its
owner.

Here is the result from zk
[zk: localhost:2181(CONNECTED) 0] get
/consumers/test/owners/mm-benchmark-test/0

Node does not exist: /consumers/test12345667f/owners/mm-benchmark-test/0

[zk: localhost:2181(CONNECTED) 1] get
/consumers/test/owners/mm-benchmark-test1/0

test-localhost-1426605370072-904d6fba-0

The kafka version I use is 0.8.2.1

-- 
Regards,
Tao


Re: No topic owner when using different assignment strategies

2015-03-17 Thread tao xiao
This is the corrected zk result

Here is the result from zk
[zk: localhost:2181(CONNECTED) 0] get
/consumers/test/owners/mm-benchmark-test/0

Node does not exist: /consumers/test/owners/mm-benchmark-test/0

[zk: localhost:2181(CONNECTED) 1] get
/consumers/test/owners/mm-benchmark-test1/0

test-localhost-1426605370072-904d6fba-0

On Tue, Mar 17, 2015 at 11:30 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I have two consumer instances with the same group id connecting to two
 different topics with 1 partition created for each. One consumer uses
 partition.assignment.strategy=roundrobin and the other one uses default
 assignment strategy. Both consumers have 1 thread spawned internally and
 connect kafka using createMessageStreamsByFilter.
 The consumer with roundrobin assignment strategy connected kafka first and
 had 2 topics assigned to itself and then I brought up another consumer that
 has default assignment strategy configured. I saw rebalancing happened in
 both consumers but at the end only one of the topics was assigned to the
 consumer that is configured roundrobin assignment strategy and no topics
 were assigned to the other consumer. This led to one topic missing its
 owner.

 Here is the result from zk
 [zk: localhost:2181(CONNECTED) 0] get
 /consumers/test/owners/mm-benchmark-test/0

 Node does not exist:
 /consumers/test12345667f/owners/mm-benchmark-test/0

 [zk: localhost:2181(CONNECTED) 1] get
 /consumers/test/owners/mm-benchmark-test1/0

 test-localhost-1426605370072-904d6fba-0

 The kafka version I use is 0.8.2.1

 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-13 Thread tao xiao
The number of fetchers is configurable via num.replica.fetchers. The
description of num.replica.fetchers in Kafka documentation is not quite
accurate. num.replica.fetchers actually controls the max number of fetchers
per broker. In you case num.replica.fetchers=8 and 5 brokers the means no
more 8 fetchers created for each broker

On Fri, Mar 13, 2015 at 1:21 PM, Zakee kzak...@netzero.net wrote:

 Is this always the case that there is only one fetcher per broker, won’t
 setting num.replica.fetchers greater than number-of-brokers cause more
 fetchers per broker?
 Let’s I have 5 brokers, and num of replica fetchers is 8, will there be 2
 fetcher threads pulling from  each broker?

 Thanks
 Zakee



  On Mar 12, 2015, at 11:15 AM, James Cheng jch...@tivo.com wrote:
 
  Ah, I understand now. I didn't realize that there was one fetcher thread
 per broker.
 
  Thanks Tao  Guozhang!
  -James
 
 
  On Mar 11, 2015, at 5:00 PM, tao xiao xiaotao...@gmail.com mailto:
 xiaotao...@gmail.com wrote:
 
  Fetcher thread is per broker basis, it ensures that at lease one fetcher
  thread per broker. Fetcher thread is sent to broker with a fetch
 request to
  ask for all partitions. So if A, B, C are in the same broker fetcher
 thread
  is still able to fetch data from A, B, C even though A returns no data.
  same logic is applied to different broker.
 
  On Thu, Mar 12, 2015 at 6:25 AM, James Cheng jch...@tivo.com wrote:
 
 
  On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  Hi James,
 
  What I meant before is that a single fetcher may be responsible for
  putting
  fetched data to multiple queues according to the construction of the
  streams setup, where each queue may be consumed by a different thread.
  And
  the queues are actually bounded. Now say if there are two queues that
 are
  getting data from the same fetcher F, and are consumed by two
 different
  user threads A and B. If thread A for some reason got slowed / hung
  consuming data from queue 1, then queue 1 will eventually get full,
 and F
  trying to put more data to it will be blocked. Since F is parked on
  trying
  to put data to queue 1, queue 2 will not get more data from it, and
  thread
  B may hence gets starved. Does that make sense now?
 
 
  Yes, that makes sense. That is the scenario where one thread of a
 consumer
  can cause a backup in the queue, which would cause other threads to not
  receive data.
 
  What about the situation I described, where a thread consumes a queue
 that
  is supposed to be filled with messages from multiple partitions? If
  partition A has no messages and partitions B and C do, how will the
 fetcher
  behave? Will the processing thread receive messages from partitions B
 and C?
 
  Thanks,
  -James
 
 
  Guozhang
 
  On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote:
 
  Hi,
 
  Sorry to bring up this old thread, but my question is about this
 exact
  thing:
 
  Guozhang, you said:
  A more concrete example: say you have topic AC: 3 partitions, topic
  BC: 6
  partitions.
 
  With createMessageStreams(AC = 3, BC = 2) a total of 5 threads
  will
  be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
  respectively;
 
  With createMessageStreamsByFilter(*C = 3) a total of 3 threads
 will
  be
  created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
 AC-3/BC-5/BC-6
  respectively.
 
 
  You said that in the createMessageStreamsByFilter case, if topic AC
 had
  no
  messages in it and consumer.timeout.ms = -1, then the 3 threads
 might
  all
  be blocked waiting for data to arrive from topic AC, and so messages
  from
  BC would not be processed.
 
  createMessageStreamsByFilter(*C = 1) (single stream) would have
 the
  same problem but just worse. Behind the scenes, is there a single
 thread
  that is consuming (round-robin?) messages from the different
 partitions
  and
  inserting them all into a single queue for the application code to
  process?
  And that is why a single partition with no messages with block the
 other
  messages from getting through?
 
  What about createMessageStreams(AC = 1)? That creates a single
 stream
  that contains messages from multiple partitions, which might be on
  different brokers. Does that also suffer the same problem, where if
 one
  partition has no messages, that the application would not receive
  messages
  from the other paritions?
 
  Thanks,
  -James
 
 
  On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  The new consumer will be released in 0.9, which is targeted for end
 of
  this
  quarter.
 
  On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com
  wrote:
 
  Do you know when the new consumer API will be publicly available?
 
  On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang 
 wangg...@gmail.com
  wrote:
 
  Yes, it can get stuck. For example, AC and BC are processed by two
  different processes and AC processors gets stuck, hence AC
 messages
  will
  fill up in the consumer's buffer

Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
something like dynamic filtering that can be updated at runtime or deny all
but allow a certain set of topics that cannot be specified easily by regex

On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hmm, what kind of customized filtering do you have in mind? I thought with
 --whitelist you could already specify regex to do filtering.

 On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote:

  Hi Guozhang,
 
  I was meant to be topicfilter not topic-count. sorry for the confusion.
  What I want to achieve is to pass my own customized topicfilter to MM so
  that I can filter out topics what ever I like. I know MM doesn't support
  this now. I am just thinking if this is a good feature to add in
 
  On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Tao,
  
   Sorry I was mistaken before, yes in MM you can only directly specify
   --whitelist, --blacklist, and  the number of streams you want to
  create
   via --num.streams, but cannot set specific topic-count. This is
 because
   MM is mainly used for cross DC replication, and hence usually will pipe
  all
   topics or a majority of the topics from the source cluster to the
   destination, hence usually you do not care about some topics should
 get
  X
   streams, while some other topics should get Y streams.
  
   Guozhang
  
   On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
The topic list is not specified in consumer.properties and I don't
  think
there is any property in consumer config that allows us to specify
 what
topics we want to consume. Can you point me to the property if there
 is
any?
   
On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com
wrote:
   
 Tao,

 In MM people can pass in consumer configs, in which people can
  specify
 consumption topics, either in regular topic list format or
 whitelist
  /
 blacklist. So I think it already does what you need?

 Guozhang

 On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com
   wrote:

  Thank you guys for answering. I think it will be good that we can
   pass
 in a
  customised topicCount ( I think this is the interface whitelist
 and
  backlist implement if I am not mistaken) to MM to achieve similar
   thing
 
  On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com
wrote:
 
   Hi Tao,
  
   Unfortunately MM does not support whitelist / blacklist at the
  same
 time,
   and you have to choose either one upon initialization. As for
  your
 case,
  I
   think it can be captured by some reg-ex to exclude nothing else
  but
 10,
   but I do not know the exact expression.
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 7:58 AM, tao xiao 
 xiaotao...@gmail.com
   javascript:; wrote:
  
I actually mean if we can achieve this in mirror maker.
   
On Tue, Mar 10, 2015 at 10:52 PM, tao xiao 
  xiaotao...@gmail.com
   javascript:; wrote:
   
 Hi,

 I have an user case where I need to consume a list topics
  with
name
   that
 matches pattern topic.* except for one that is topic.10. Is
there a
  way
 that I can combine the use of whitelist and blacklist so
  that I
can
achieve
 something like accept all topics with regex topic.* but
  exclude
   topic.10?

 --
 Regards,
 Tao

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




-- 
Regards,
Tao


Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
A little more context about my needs: I have a requirement that I need to
start/stop a topic at runtime based on a event sent to MM. at the moment I
need to bounce the MM and find a way to exclude the topic from whitelist
which is not an easy job with regex. If I can pass in a combination of
blacklist and whitelist I can easily achieve this by having something like
--whitelist topic.* --blacklist topic.1

On Thu, Mar 12, 2015 at 9:10 PM, tao xiao xiaotao...@gmail.com wrote:

 something like dynamic filtering that can be updated at runtime or deny
 all but allow a certain set of topics that cannot be specified easily by
 regex

 On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hmm, what kind of customized filtering do you have in mind? I thought with
 --whitelist you could already specify regex to do filtering.

 On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote:

  Hi Guozhang,
 
  I was meant to be topicfilter not topic-count. sorry for the confusion.
  What I want to achieve is to pass my own customized topicfilter to MM so
  that I can filter out topics what ever I like. I know MM doesn't support
  this now. I am just thinking if this is a good feature to add in
 
  On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Tao,
  
   Sorry I was mistaken before, yes in MM you can only directly specify
   --whitelist, --blacklist, and  the number of streams you want to
  create
   via --num.streams, but cannot set specific topic-count. This is
 because
   MM is mainly used for cross DC replication, and hence usually will
 pipe
  all
   topics or a majority of the topics from the source cluster to the
   destination, hence usually you do not care about some topics should
 get
  X
   streams, while some other topics should get Y streams.
  
   Guozhang
  
   On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
The topic list is not specified in consumer.properties and I don't
  think
there is any property in consumer config that allows us to specify
 what
topics we want to consume. Can you point me to the property if
 there is
any?
   
On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com
 
wrote:
   
 Tao,

 In MM people can pass in consumer configs, in which people can
  specify
 consumption topics, either in regular topic list format or
 whitelist
  /
 blacklist. So I think it already does what you need?

 Guozhang

 On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com
   wrote:

  Thank you guys for answering. I think it will be good that we
 can
   pass
 in a
  customised topicCount ( I think this is the interface whitelist
 and
  backlist implement if I am not mistaken) to MM to achieve
 similar
   thing
 
  On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com
 
wrote:
 
   Hi Tao,
  
   Unfortunately MM does not support whitelist / blacklist at the
  same
 time,
   and you have to choose either one upon initialization. As for
  your
 case,
  I
   think it can be captured by some reg-ex to exclude nothing
 else
  but
 10,
   but I do not know the exact expression.
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 7:58 AM, tao xiao 
 xiaotao...@gmail.com
   javascript:; wrote:
  
I actually mean if we can achieve this in mirror maker.
   
On Tue, Mar 10, 2015 at 10:52 PM, tao xiao 
  xiaotao...@gmail.com
   javascript:; wrote:
   
 Hi,

 I have an user case where I need to consume a list topics
  with
name
   that
 matches pattern topic.* except for one that is topic.10.
 Is
there a
  way
 that I can combine the use of whitelist and blacklist so
  that I
can
achieve
 something like accept all topics with regex topic.* but
  exclude
   topic.10?

 --
 Regards,
 Tao

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
Thank you Guozhang for your advice. A dynamic topic filter is what I need
so that I can stop a topic consumption when I need to at runtime.

On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com wrote:

 1. Dynamic: yeah that is sth. we could think of, this could be useful
 operationally.
 2. Regex: I think in terms of expressiveness it should be sufficient for
 almost all subset of topics. In practice usually the rule of thumb is that
 you will create your topics that belongs to the same group with some
 prefix / suffix so that regex expression would not be crazily long.

 Guozhang

 On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com wrote:

  something like dynamic filtering that can be updated at runtime or deny
 all
  but allow a certain set of topics that cannot be specified easily by
 regex
 
  On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hmm, what kind of customized filtering do you have in mind? I thought
  with
   --whitelist you could already specify regex to do filtering.
  
   On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com
 wrote:
  
Hi Guozhang,
   
I was meant to be topicfilter not topic-count. sorry for the
 confusion.
What I want to achieve is to pass my own customized topicfilter to MM
  so
that I can filter out topics what ever I like. I know MM doesn't
  support
this now. I am just thinking if this is a good feature to add in
   
On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com
   wrote:
   
 Hi Tao,

 Sorry I was mistaken before, yes in MM you can only directly
 specify
 --whitelist, --blacklist, and  the number of streams you want
 to
create
 via --num.streams, but cannot set specific topic-count. This is
   because
 MM is mainly used for cross DC replication, and hence usually will
  pipe
all
 topics or a majority of the topics from the source cluster to the
 destination, hence usually you do not care about some topics
 should
   get
X
 streams, while some other topics should get Y streams.

 Guozhang

 On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com
   wrote:

  The topic list is not specified in consumer.properties and I
 don't
think
  there is any property in consumer config that allows us to
 specify
   what
  topics we want to consume. Can you point me to the property if
  there
   is
  any?
 
  On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang 
  wangg...@gmail.com
  wrote:
 
   Tao,
  
   In MM people can pass in consumer configs, in which people can
specify
   consumption topics, either in regular topic list format or
   whitelist
/
   blacklist. So I think it already does what you need?
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 10:09 PM, tao xiao 
 xiaotao...@gmail.com
  
 wrote:
  
Thank you guys for answering. I think it will be good that we
  can
 pass
   in a
customised topicCount ( I think this is the interface
 whitelist
   and
backlist implement if I am not mistaken) to MM to achieve
  similar
 thing
   
On Wednesday, March 11, 2015, Guozhang Wang 
  wangg...@gmail.com
  wrote:
   
 Hi Tao,

 Unfortunately MM does not support whitelist / blacklist at
  the
same
   time,
 and you have to choose either one upon initialization. As
 for
your
   case,
I
 think it can be captured by some reg-ex to exclude nothing
  else
but
   10,
 but I do not know the exact expression.

 Guozhang

 On Tue, Mar 10, 2015 at 7:58 AM, tao xiao 
   xiaotao...@gmail.com
 javascript:; wrote:

  I actually mean if we can achieve this in mirror maker.
 
  On Tue, Mar 10, 2015 at 10:52 PM, tao xiao 
xiaotao...@gmail.com
 javascript:; wrote:
 
   Hi,
  
   I have an user case where I need to consume a list
 topics
with
  name
 that
   matches pattern topic.* except for one that is
 topic.10.
  Is
  there a
way
   that I can combine the use of whitelist and blacklist
 so
that I
  can
  achieve
   something like accept all topics with regex topic.* but
exclude
 topic.10?
  
   --
   Regards,
   Tao
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang

   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang

Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
Yes, that will work. message handle can filter out message sent from
certain topics

On Fri, Mar 13, 2015 at 6:30 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 No sure if it is an option. But does filtering out topics with message
 handler works for you? Are you going to resume consuming from a topic
 after you stop consuming from it?

 Jiangjie (Becket) Qin

 On 3/12/15, 8:05 AM, tao xiao xiaotao...@gmail.com wrote:

 Yes, you are right. a dynamic topicfilter is more appropriate where I can
 filter topics at runtime via some kind of interface e.g. JMX
 
 On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Tao,
 
  Based on your description I think the combination of whitelist /
 blacklist
  will not achieve your goal, since it is still static.
 
  Guozhang
 
  On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com wrote:
 
   Thank you Guozhang for your advice. A dynamic topic filter is what I
 need
   so that I can stop a topic consumption when I need to at runtime.
  
   On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com
  wrote:
  
1. Dynamic: yeah that is sth. we could think of, this could be
 useful
operationally.
2. Regex: I think in terms of expressiveness it should be sufficient
  for
almost all subset of topics. In practice usually the rule of thumb
 is
   that
you will create your topics that belongs to the same group with
 some
prefix / suffix so that regex expression would not be crazily long.
   
Guozhang
   
On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com
  wrote:
   
 something like dynamic filtering that can be updated at runtime or
  deny
all
 but allow a certain set of topics that cannot be specified easily
 by
regex

 On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang
 wangg...@gmail.com
wrote:

  Hmm, what kind of customized filtering do you have in mind? I
  thought
 with
  --whitelist you could already specify regex to do filtering.
 
  On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com
 
wrote:
 
   Hi Guozhang,
  
   I was meant to be topicfilter not topic-count. sorry for the
confusion.
   What I want to achieve is to pass my own customized
 topicfilter
  to
   MM
 so
   that I can filter out topics what ever I like. I know MM
 doesn't
 support
   this now. I am just thinking if this is a good feature to add
 in
  
   On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang 
  wangg...@gmail.com
   
  wrote:
  
Hi Tao,
   
Sorry I was mistaken before, yes in MM you can only directly
specify
--whitelist, --blacklist, and  the number of streams you
  want
to
   create
via --num.streams, but cannot set specific topic-count.
 This
  is
  because
MM is mainly used for cross DC replication, and hence
 usually
   will
 pipe
   all
topics or a majority of the topics from the source cluster
 to
  the
destination, hence usually you do not care about some
 topics
should
  get
   X
streams, while some other topics should get Y streams.
   
Guozhang
   
On Wed, Mar 11, 2015 at 11:59 PM, tao xiao 
  xiaotao...@gmail.com
   
  wrote:
   
 The topic list is not specified in consumer.properties
 and I
don't
   think
 there is any property in consumer config that allows us to
specify
  what
 topics we want to consume. Can you point me to the
 property
  if
 there
  is
 any?

 On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang 
 wangg...@gmail.com
 wrote:

  Tao,
 
  In MM people can pass in consumer configs, in which
 people
   can
   specify
  consumption topics, either in regular topic list format
 or
  whitelist
   /
  blacklist. So I think it already does what you need?
 
  Guozhang
 
  On Tue, Mar 10, 2015 at 10:09 PM, tao xiao 
xiaotao...@gmail.com
 
wrote:
 
   Thank you guys for answering. I think it will be good
  that
   we
 can
pass
  in a
   customised topicCount ( I think this is the interface
whitelist
  and
   backlist implement if I am not mistaken) to MM to
 achieve
 similar
thing
  
   On Wednesday, March 11, 2015, Guozhang Wang 
 wangg...@gmail.com
 wrote:
  
Hi Tao,
   
Unfortunately MM does not support whitelist /
 blacklist
   at
 the
   same
  time,
and you have to choose either one upon
 initialization.
  As
for
   your
  case,
   I
think it can be captured by some reg-ex to exclude
   nothing
 else
   but
  10,
but I

Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
I am not sure how MM is going to be rewritten. Based on the current
implementation in trunk offset is not committed unless it is produced to
destination. With assumption that this logic remains MM will not
acknowledge the offset back to source for filtered message. So I think it
is safe to filter messages out while keeping committed offset unchanged for
that particular topic. Please correct me if I am wrong

On Fri, Mar 13, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote:

 Note that with filtering in message handler, records from the source
 cluster are still considered as consumed since the offsets will be
 committed. If you change the filtering dynamically back to whilelist these
 topics, you will lose the data that gets consumed during the period of the
 blacklist.

 Guozhang

 On Thu, Mar 12, 2015 at 10:01 PM, tao xiao xiaotao...@gmail.com wrote:

  Yes, that will work. message handle can filter out message sent from
  certain topics
 
  On Fri, Mar 13, 2015 at 6:30 AM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   No sure if it is an option. But does filtering out topics with message
   handler works for you? Are you going to resume consuming from a topic
   after you stop consuming from it?
  
   Jiangjie (Becket) Qin
  
   On 3/12/15, 8:05 AM, tao xiao xiaotao...@gmail.com wrote:
  
   Yes, you are right. a dynamic topicfilter is more appropriate where I
  can
   filter topics at runtime via some kind of interface e.g. JMX
   
   On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com
   wrote:
   
Tao,
   
Based on your description I think the combination of whitelist /
   blacklist
will not achieve your goal, since it is still static.
   
Guozhang
   
On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com
  wrote:
   
 Thank you Guozhang for your advice. A dynamic topic filter is
 what I
   need
 so that I can stop a topic consumption when I need to at runtime.

 On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang 
 wangg...@gmail.com
wrote:

  1. Dynamic: yeah that is sth. we could think of, this could be
   useful
  operationally.
  2. Regex: I think in terms of expressiveness it should be
  sufficient
for
  almost all subset of topics. In practice usually the rule of
 thumb
   is
 that
  you will create your topics that belongs to the same group
 with
   some
  prefix / suffix so that regex expression would not be crazily
  long.
 
  Guozhang
 
  On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com
 
wrote:
 
   something like dynamic filtering that can be updated at
 runtime
  or
deny
  all
   but allow a certain set of topics that cannot be specified
  easily
   by
  regex
  
   On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang
   wangg...@gmail.com
  wrote:
  
Hmm, what kind of customized filtering do you have in mind?
 I
thought
   with
--whitelist you could already specify regex to do
 filtering.
   
On Thu, Mar 12, 2015 at 5:56 AM, tao xiao 
  xiaotao...@gmail.com
   
  wrote:
   
 Hi Guozhang,

 I was meant to be topicfilter not topic-count. sorry for
 the
  confusion.
 What I want to achieve is to pass my own customized
   topicfilter
to
 MM
   so
 that I can filter out topics what ever I like. I know MM
   doesn't
   support
 this now. I am just thinking if this is a good feature to
  add
   in

 On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang 
wangg...@gmail.com
 
wrote:

  Hi Tao,
 
  Sorry I was mistaken before, yes in MM you can only
  directly
  specify
  --whitelist, --blacklist, and  the number of streams
  you
want
  to
 create
  via --num.streams, but cannot set specific
 topic-count.
   This
is
because
  MM is mainly used for cross DC replication, and hence
   usually
 will
   pipe
 all
  topics or a majority of the topics from the source
 cluster
   to
the
  destination, hence usually you do not care about some
   topics
  should
get
 X
  streams, while some other topics should get Y streams.
 
  Guozhang
 
  On Wed, Mar 11, 2015 at 11:59 PM, tao xiao 
xiaotao...@gmail.com
 
wrote:
 
   The topic list is not specified in consumer.properties
   and I
  don't
 think
   there is any property in consumer config that allows
 us
  to
  specify
what
   topics we want to consume. Can you point me to the
   property
if
   there
is
   any?
  
   On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang 
   wangg...@gmail.com
   wrote:
  
Tao

Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
Yes, you are right. a dynamic topicfilter is more appropriate where I can
filter topics at runtime via some kind of interface e.g. JMX

On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com wrote:

 Tao,

 Based on your description I think the combination of whitelist / blacklist
 will not achieve your goal, since it is still static.

 Guozhang

 On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com wrote:

  Thank you Guozhang for your advice. A dynamic topic filter is what I need
  so that I can stop a topic consumption when I need to at runtime.
 
  On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   1. Dynamic: yeah that is sth. we could think of, this could be useful
   operationally.
   2. Regex: I think in terms of expressiveness it should be sufficient
 for
   almost all subset of topics. In practice usually the rule of thumb is
  that
   you will create your topics that belongs to the same group with some
   prefix / suffix so that regex expression would not be crazily long.
  
   Guozhang
  
   On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com
 wrote:
  
something like dynamic filtering that can be updated at runtime or
 deny
   all
but allow a certain set of topics that cannot be specified easily by
   regex
   
On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com
   wrote:
   
 Hmm, what kind of customized filtering do you have in mind? I
 thought
with
 --whitelist you could already specify regex to do filtering.

 On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com
   wrote:

  Hi Guozhang,
 
  I was meant to be topicfilter not topic-count. sorry for the
   confusion.
  What I want to achieve is to pass my own customized topicfilter
 to
  MM
so
  that I can filter out topics what ever I like. I know MM doesn't
support
  this now. I am just thinking if this is a good feature to add in
 
  On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang 
 wangg...@gmail.com
  
 wrote:
 
   Hi Tao,
  
   Sorry I was mistaken before, yes in MM you can only directly
   specify
   --whitelist, --blacklist, and  the number of streams you
 want
   to
  create
   via --num.streams, but cannot set specific topic-count. This
 is
 because
   MM is mainly used for cross DC replication, and hence usually
  will
pipe
  all
   topics or a majority of the topics from the source cluster to
 the
   destination, hence usually you do not care about some topics
   should
 get
  X
   streams, while some other topics should get Y streams.
  
   Guozhang
  
   On Wed, Mar 11, 2015 at 11:59 PM, tao xiao 
 xiaotao...@gmail.com
  
 wrote:
  
The topic list is not specified in consumer.properties and I
   don't
  think
there is any property in consumer config that allows us to
   specify
 what
topics we want to consume. Can you point me to the property
 if
there
 is
any?
   
On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang 
wangg...@gmail.com
wrote:
   
 Tao,

 In MM people can pass in consumer configs, in which people
  can
  specify
 consumption topics, either in regular topic list format or
 whitelist
  /
 blacklist. So I think it already does what you need?

 Guozhang

 On Tue, Mar 10, 2015 at 10:09 PM, tao xiao 
   xiaotao...@gmail.com

   wrote:

  Thank you guys for answering. I think it will be good
 that
  we
can
   pass
 in a
  customised topicCount ( I think this is the interface
   whitelist
 and
  backlist implement if I am not mistaken) to MM to achieve
similar
   thing
 
  On Wednesday, March 11, 2015, Guozhang Wang 
wangg...@gmail.com
wrote:
 
   Hi Tao,
  
   Unfortunately MM does not support whitelist / blacklist
  at
the
  same
 time,
   and you have to choose either one upon initialization.
 As
   for
  your
 case,
  I
   think it can be captured by some reg-ex to exclude
  nothing
else
  but
 10,
   but I do not know the exact expression.
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 7:58 AM, tao xiao 
 xiaotao...@gmail.com
   javascript:; wrote:
  
I actually mean if we can achieve this in mirror
 maker.
   
On Tue, Mar 10, 2015 at 10:52 PM, tao xiao 
  xiaotao...@gmail.com
   javascript:; wrote:
   
 Hi,

 I have an user case where I need to consume a list
   topics
  with
name
   that
 matches pattern topic.* except for one that is
   topic

Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
Hi Guozhang,

I was meant to be topicfilter not topic-count. sorry for the confusion.
What I want to achieve is to pass my own customized topicfilter to MM so
that I can filter out topics what ever I like. I know MM doesn't support
this now. I am just thinking if this is a good feature to add in

On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Tao,

 Sorry I was mistaken before, yes in MM you can only directly specify
 --whitelist, --blacklist, and  the number of streams you want to create
 via --num.streams, but cannot set specific topic-count. This is because
 MM is mainly used for cross DC replication, and hence usually will pipe all
 topics or a majority of the topics from the source cluster to the
 destination, hence usually you do not care about some topics should get X
 streams, while some other topics should get Y streams.

 Guozhang

 On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote:

  The topic list is not specified in consumer.properties and I don't think
  there is any property in consumer config that allows us to specify what
  topics we want to consume. Can you point me to the property if there is
  any?
 
  On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
   Tao,
  
   In MM people can pass in consumer configs, in which people can specify
   consumption topics, either in regular topic list format or whitelist /
   blacklist. So I think it already does what you need?
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Thank you guys for answering. I think it will be good that we can
 pass
   in a
customised topicCount ( I think this is the interface whitelist and
backlist implement if I am not mistaken) to MM to achieve similar
 thing
   
On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com
  wrote:
   
 Hi Tao,

 Unfortunately MM does not support whitelist / blacklist at the same
   time,
 and you have to choose either one upon initialization. As for your
   case,
I
 think it can be captured by some reg-ex to exclude nothing else but
   10,
 but I do not know the exact expression.

 Guozhang

 On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com
 javascript:; wrote:

  I actually mean if we can achieve this in mirror maker.
 
  On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com
 javascript:; wrote:
 
   Hi,
  
   I have an user case where I need to consume a list topics with
  name
 that
   matches pattern topic.* except for one that is topic.10. Is
  there a
way
   that I can combine the use of whitelist and blacklist so that I
  can
  achieve
   something like accept all topics with regex topic.* but exclude
 topic.10?
  
   --
   Regards,
   Tao
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang

   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




-- 
Regards,
Tao


Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-12 Thread tao xiao
The topic list is not specified in consumer.properties and I don't think
there is any property in consumer config that allows us to specify what
topics we want to consume. Can you point me to the property if there is any?

On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote:

 Tao,

 In MM people can pass in consumer configs, in which people can specify
 consumption topics, either in regular topic list format or whitelist /
 blacklist. So I think it already does what you need?

 Guozhang

 On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote:

  Thank you guys for answering. I think it will be good that we can pass
 in a
  customised topicCount ( I think this is the interface whitelist and
  backlist implement if I am not mistaken) to MM to achieve similar thing
 
  On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote:
 
   Hi Tao,
  
   Unfortunately MM does not support whitelist / blacklist at the same
 time,
   and you have to choose either one upon initialization. As for your
 case,
  I
   think it can be captured by some reg-ex to exclude nothing else but
 10,
   but I do not know the exact expression.
  
   Guozhang
  
   On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com
   javascript:; wrote:
  
I actually mean if we can achieve this in mirror maker.
   
On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com
   javascript:; wrote:
   
 Hi,

 I have an user case where I need to consume a list topics with name
   that
 matches pattern topic.* except for one that is topic.10. Is there a
  way
 that I can combine the use of whitelist and blacklist so that I can
achieve
 something like accept all topics with regex topic.* but exclude
   topic.10?

 --
 Regards,
 Tao

   
   
   
--
Regards,
Tao
   
  
  
  
   --
   -- Guozhang
  
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




-- 
Regards,
Tao


Re: Out of Disk Space - Infinite loop

2015-03-12 Thread tao xiao
Did you stop mirror maker?

On Thu, Mar 12, 2015 at 8:27 AM, Saladi Naidu naidusp2...@yahoo.com.invalid
 wrote:

 We have 3 DC's and created 5 node Kafka cluster in each DC, connected
 these 3 DC's using Mirror Maker for replication. We were conducting
 performance testing using Kafka Producer Performance tool to load 100
 million rows into 7 topics. We expected that data will be loaded evenly
 across 7 topics but 4 topics got loaded with ~2 million messages and
 remaining 3 topics loaded with 90 million messages. The nodes that were
 leaders of those 3 topics ran out of disk space and nodes went down.
 We tried to bring back these 2 nodes by doing following
 1. Stopped Kafka Service 2. Deleted couple of topics that were taking up
 too much space i.e. /var/kafka/logs/{topic$}/ and file system showed 47%
 available 3. Brought back the Kafka nodes
 As soon as nodes are back, we started observing the file system growing
 and in 15 minutes the mount point became full again. Deleted topics got
 recreated and taking up space again. Looking at kafka.log, it shows many of
 the following messages. Ultimately the node goes down. We don't need to
 recover data now, we would like to bring nodes back. What are the steps to
 bring back these nodes?
 [2015-03-11 20:52:36,323] INFO Rolled new log segment for
 'dc2-perf-topic5-0' in 3 ms. (kafka.log.Log)
 [2015-03-11 15:58:07,321] INFO [Kafka Server 1021124614], started
 (kafka.server.KafkaServer)
 [2015-03-11 15:58:07,882] INFO Completed load of log dc2-perf-topic5-0
 with log end offset 0 (kafka.log.Log)
 [2015-03-11 15:58:07,900] INFO Created log for partition
 [dc2-perf-topic5,0] in /var/kafka/log with properties {segment.index.bytes
 - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824,
 flush.ms - 9223372036854775807, delete.retention.ms - 360,
 index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy -
 delete, segment.ms - 60480, max.message.bytes - 112,
 flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5,
 retention.ms - 60480}. (kafka.log.LogManager)
 [2015-03-11 15:58:07,914] INFO Completed load of log dc2-perf-topic2-0
 with log end offset 0 (kafka.log.Log)
 [2015-03-11 15:58:07,916] INFO Created log for partition
 [dc2-perf-topic2,0] in /var/kafka/log with properties {segment.index.bytes
 - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824,
 flush.ms - 9223372036854775807, delete.retention.ms - 360,
 index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy -
 delete, segment.ms - 60480, max.message.bytes - 112,
 flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5,
 retention.ms - 60480}. (kafka.log.LogManager)
 [2015-03-11 15:58:07,935] INFO Completed load of log dc2-perf-topic9-0
 with log end offset 0 (kafka.log.Log)

  SP Naidu




-- 
Regards,
Tao


Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-11 Thread tao xiao
Fetcher thread is per broker basis, it ensures that at lease one fetcher
thread per broker. Fetcher thread is sent to broker with a fetch request to
ask for all partitions. So if A, B, C are in the same broker fetcher thread
is still able to fetch data from A, B, C even though A returns no data.
same logic is applied to different broker.

On Thu, Mar 12, 2015 at 6:25 AM, James Cheng jch...@tivo.com wrote:


 On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote:

  Hi James,
 
  What I meant before is that a single fetcher may be responsible for
 putting
  fetched data to multiple queues according to the construction of the
  streams setup, where each queue may be consumed by a different thread.
 And
  the queues are actually bounded. Now say if there are two queues that are
  getting data from the same fetcher F, and are consumed by two different
  user threads A and B. If thread A for some reason got slowed / hung
  consuming data from queue 1, then queue 1 will eventually get full, and F
  trying to put more data to it will be blocked. Since F is parked on
 trying
  to put data to queue 1, queue 2 will not get more data from it, and
 thread
  B may hence gets starved. Does that make sense now?
 

 Yes, that makes sense. That is the scenario where one thread of a consumer
 can cause a backup in the queue, which would cause other threads to not
 receive data.

 What about the situation I described, where a thread consumes a queue that
 is supposed to be filled with messages from multiple partitions? If
 partition A has no messages and partitions B and C do, how will the fetcher
 behave? Will the processing thread receive messages from partitions B and C?

 Thanks,
 -James


  Guozhang
 
  On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote:
 
  Hi,
 
  Sorry to bring up this old thread, but my question is about this exact
  thing:
 
  Guozhang, you said:
  A more concrete example: say you have topic AC: 3 partitions, topic
 BC: 6
  partitions.
 
  With createMessageStreams(AC = 3, BC = 2) a total of 5 threads
 will
  be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
 respectively;
 
  With createMessageStreamsByFilter(*C = 3) a total of 3 threads will
 be
  created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
  respectively.
 
 
  You said that in the createMessageStreamsByFilter case, if topic AC had
 no
  messages in it and consumer.timeout.ms = -1, then the 3 threads might
 all
  be blocked waiting for data to arrive from topic AC, and so messages
 from
  BC would not be processed.
 
  createMessageStreamsByFilter(*C = 1) (single stream) would have the
  same problem but just worse. Behind the scenes, is there a single thread
  that is consuming (round-robin?) messages from the different partitions
 and
  inserting them all into a single queue for the application code to
 process?
  And that is why a single partition with no messages with block the other
  messages from getting through?
 
  What about createMessageStreams(AC = 1)? That creates a single stream
  that contains messages from multiple partitions, which might be on
  different brokers. Does that also suffer the same problem, where if one
  partition has no messages, that the application would not receive
 messages
  from the other paritions?
 
  Thanks,
  -James
 
 
  On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  The new consumer will be released in 0.9, which is targeted for end of
  this
  quarter.
 
  On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com
 wrote:
 
  Do you know when the new consumer API will be publicly available?
 
  On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
  Yes, it can get stuck. For example, AC and BC are processed by two
  different processes and AC processors gets stuck, hence AC messages
  will
  fill up in the consumer's buffer and eventually prevents the fetcher
  thread
  to put more data into it; the fetcher thread will be blocked on that
  and
  not be able to fetch BC.
 
  This issue has been addressed in the new consumer client, which is
  single-threaded with non-blocking APIs.
 
  Guozhang
 
  On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com
  wrote:
 
  Thank you Guozhang for your detailed explanation. In your example
  createMessageStreamsByFilter(*C = 3)  since threads are shared
  among
  topics there may be situation where all 3 threads threads get stuck
  with
  topic AC e.g. topic is empty which will be holding the connecting
  threads
  (setting consumer.timeout.ms=-1) hence there is no thread to serve
  topic
  BC. do you think this situation will happen?
 
  On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
  I was not clear before .. for createMessageStreamsByFilter each
  matched
  topic will have num-threads, but shared: i.e. there will be totally
  num-threads created, but each thread will be responsible for
 fetching
  all

Re: createMessageStreams vs createMessageStreamsByFilter

2015-03-11 Thread tao xiao
 consumer.timeout.ms only affects how the stream reads data from the
internal chunk queue that is used to buffer received data. The actual data
fetching is done by another fetcher
thread kafka.consumer.ConsumerFetcherThread. The fetcher thread keeps
reading data from broker and put them to the queue and the stream keeps
polling the queue and passes data back to consumer if any.

So for the case like createMessageStreams(AC = 1) the same stream (
which means the same chunk queue) is shared by multiple partitions of topic
AC. If one of the partition has no data the consumer is still able to read
data from other partitions as the fetcher thread keeps feeding data from
other partitions to the queue.

The only situation where consumer will get stuck is when fetcher thread is
blocked by network like high network latency between consumer and broker or
no data from broker. This is because fetch thread is implemented using
block I/O


On Wed, Mar 11, 2015 at 8:15 AM, James Cheng jch...@tivo.com wrote:

 Hi,

 Sorry to bring up this old thread, but my question is about this exact
 thing:

 Guozhang, you said:
  A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
  partitions.
 
  With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will
  be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
 
  With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be
  created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
  respectively.


 You said that in the createMessageStreamsByFilter case, if topic AC had no
 messages in it and consumer.timeout.ms = -1, then the 3 threads might all
 be blocked waiting for data to arrive from topic AC, and so messages from
 BC would not be processed.

 createMessageStreamsByFilter(*C = 1) (single stream) would have the
 same problem but just worse. Behind the scenes, is there a single thread
 that is consuming (round-robin?) messages from the different partitions and
 inserting them all into a single queue for the application code to process?
 And that is why a single partition with no messages with block the other
 messages from getting through?

 What about createMessageStreams(AC = 1)? That creates a single stream
 that contains messages from multiple partitions, which might be on
 different brokers. Does that also suffer the same problem, where if one
 partition has no messages, that the application would not receive messages
 from the other paritions?

 Thanks,
 -James


 On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote:

  The new consumer will be released in 0.9, which is targeted for end of
 this
  quarter.
 
  On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Do you know when the new consumer API will be publicly available?
 
  On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
  Yes, it can get stuck. For example, AC and BC are processed by two
  different processes and AC processors gets stuck, hence AC messages
 will
  fill up in the consumer's buffer and eventually prevents the fetcher
  thread
  to put more data into it; the fetcher thread will be blocked on that
 and
  not be able to fetch BC.
 
  This issue has been addressed in the new consumer client, which is
  single-threaded with non-blocking APIs.
 
  Guozhang
 
  On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com
 wrote:
 
  Thank you Guozhang for your detailed explanation. In your example
  createMessageStreamsByFilter(*C = 3)  since threads are shared
 among
  topics there may be situation where all 3 threads threads get stuck
  with
  topic AC e.g. topic is empty which will be holding the connecting
  threads
  (setting consumer.timeout.ms=-1) hence there is no thread to serve
  topic
  BC. do you think this situation will happen?
 
  On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
  I was not clear before .. for createMessageStreamsByFilter each
  matched
  topic will have num-threads, but shared: i.e. there will be totally
  num-threads created, but each thread will be responsible for fetching
  all
  matched topics.
 
  A more concrete example: say you have topic AC: 3 partitions, topic
  BC: 6
  partitions.
 
  With createMessageStreams(AC = 3, BC = 2) a total of 5 threads
  will
  be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
  respectively;
 
  With createMessageStreamsByFilter(*C = 3) a total of 3 threads
  will
  be
  created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
  respectively.
 
  Guozhang
 
  On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com
  wrote:
 
  Guozhang,
 
  Do you mean that each regex matched topic owns number of threads
  that
  get
  passed in to createMessageStreamsByFilter ? For example in below
  code
  If
  I
  have 3 matched topics each of which has 2 partitions then I should
  have
  3 *
  2 = 6 threads in total with each topic owning 2 threads

Re: How replicas catch up the leader

2015-03-10 Thread tao xiao
I ended up running kafka-reassign-partitions.sh to reassign partitions to
different nodes

On Tue, Mar 10, 2015 at 11:31 AM, sy.pan shengyi@gmail.com wrote:

 Hi, tao xiao and Jiangjie Qin

 I encounter with the same issue, my node had recovered from high load
 problem (caused by other application)

 this is the kafka-topic show:

 Topic:ad_click_sts  PartitionCount:6ReplicationFactor:2
  Configs:
 Topic: ad_click_sts Partition: 0Leader: 1   Replicas:
 1,0   Isr: 1
 Topic: ad_click_sts Partition: 1Leader: 0   Replicas:
 0,1   Isr: 0
 Topic: ad_click_sts Partition: 2Leader: 1   Replicas:
 1,0   Isr: 1
 Topic: ad_click_sts Partition: 3Leader: 0   Replicas:
 0,1   Isr: 0
 Topic: ad_click_sts Partition: 4Leader: 1   Replicas:
 1,0   Isr: 1
 Topic: ad_click_sts Partition: 5Leader: 0   Replicas:
 0,1   Isr: 0

 ReplicaFetcherThread info extracted from kafka server.log :

 [2015-03-09 21:06:05,450] ERROR [ReplicaFetcherThread-0-0], Error in fetch
 Name: FetchRequest; Version: 0; CorrelationId: 7331; ClientId:
 ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
 RequestInfo: [ad_click_sts,5] -
 PartitionFetchInfo(6149699,1048576),[ad_click_sts,3] -
 PartitionFetchInfo(6147835,1048576),[ad_click_sts,1] -
 PartitionFetchInfo(6235071,1048576) (kafka.server.ReplicaFetcherThread)
 java.net.SocketTimeoutException
 at
 sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
 at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
 

..
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

 [2015-03-09 21:06:05,450] WARN Reconnect due to socket error: null
 (kafka.consumer.SimpleConsumer)

 [2015-03-09 21:05:57,116] INFO Partition [ad_click_sts,4] on broker 1:
 Cached zkVersion [556] not equal to that in zookeeper, skip updating ISR
 (kafka.cluster.Partition)

 [2015-03-09 21:06:05,772] INFO Partition [ad_click_sts,2] on broker 1:
 Shrinking ISR for partition [ad_click_sts,2] from 1,0 to 1
 (kafka.cluster.Partition)


 How to fix this Isr problem ? Is there some command can be run ?

 Regards
 sy.pan




-- 
Regards,
Tao


Does consumer support combination of whitelist and blacklist topic filtering

2015-03-10 Thread tao xiao
Hi,

I have an user case where I need to consume a list topics with name that
matches pattern topic.* except for one that is topic.10. Is there a way
that I can combine the use of whitelist and blacklist so that I can achieve
something like accept all topics with regex topic.* but exclude topic.10?

-- 
Regards,
Tao


Re: Batching at the socket layer

2015-03-10 Thread tao xiao
org.apache.kafka.clients.producer.Producer is the new api producer

On Tue, Mar 10, 2015 at 11:22 PM, Corey Nolet cjno...@gmail.com wrote:

 Thanks Jiangie! So what version is considered the new api? Is that the
 javaapi in version 0.8.2?.

 On Mon, Mar 9, 2015 at 2:29 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  The stickiness of partition only applies to old producer. In new producer
  we have the round robin for each message. The batching in new producer is
  per topic partition, the batch size it is controlled by both max batch
  size and linger time config.
 
  Jiangjie (Becket) Qin
 
  On 3/9/15, 10:10 AM, Corey Nolet cjno...@gmail.com wrote:
 
  I'm curious what type of batching Kafka producers do at the socket
 layer.
  For instance, if I have a partitioner that round robin's n messages to a
  different partition, am I guaranteed to get n different messages sent
 over
  the socket or is there some micro-batching going on underneath?
  
  I am trying to understand the semantics of the default partitioner and
 why
  it sticks to partitions for 10 minutes. If I were to lower that interval
  to
  1sec, would I acheive better batching that I would if I was to
 completely
  round-robin each message to a different partition?
 
 




-- 
Regards,
Tao


Re: Does consumer support combination of whitelist and blacklist topic filtering

2015-03-10 Thread tao xiao
I actually mean if we can achieve this in mirror maker.

On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi,

 I have an user case where I need to consume a list topics with name that
 matches pattern topic.* except for one that is topic.10. Is there a way
 that I can combine the use of whitelist and blacklist so that I can achieve
 something like accept all topics with regex topic.* but exclude topic.10?

 --
 Regards,
 Tao




-- 
Regards,
Tao


Topics are not evenly distributed to streams using Range partition assignment

2015-03-09 Thread tao xiao
Hi,

I created a message stream in my consumer using  connector
.createMessageStreamsByFilter(new Whitelist(mm-benchmark-test\\w*), 5); I
have 5 topics in my cluster and each of the topic has only one partition.
My understanding of wildcard stream is that multiple streams are shared
between selected topics. In my case 5 streams should be shared between 5
different topics. But when I looked at the log it showed a different story

2015-03-09 19:02:36 INFO  kafka.utils.Logging$class:68 -
[test12345667_LM-SHC-00950667-1425898953590-d99e2d75],
test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully
owned partition 0 for topic mm-benchmark-test2

2015-03-09 19:02:36 INFO  kafka.utils.Logging$class:68 -
[test12345667_LM-SHC-00950667-1425898953590-d99e2d75],
test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully
owned partition 0 for topic mm-benchmark-test

2015-03-09 19:02:36 INFO  kafka.utils.Logging$class:68 -
[test12345667_LM-SHC-00950667-1425898953590-d99e2d75],
test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully
owned partition 0 for topic mm-benchmark-test1

2015-03-09 19:02:36 INFO  kafka.utils.Logging$class:68 -
[test12345667_LM-SHC-00950667-1425898953590-d99e2d75],
test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully
owned partition 0 for topic mm-benchmark-test4

2015-03-09 19:02:36 INFO  kafka.utils.Logging$class:68 -
[test12345667_LM-SHC-00950667-1425898953590-d99e2d75],
test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully
owned partition 0 for topic mm-benchmark-test3


As indicated from the log only one stream was assigned to all topics.  I
just wanted to know if this is expected behavior? if yes how do we evenly
distribute topics across different streams? by using roundrobin assigner?


-- 
Regards,
Tao


Re: kafka mirroring ...!

2015-03-09 Thread tao xiao
I don't think you can mirror messages to a different topic name in the
current mirror maker implementation. Mirror maker sends the message to
destination topic based on the topic name it reads from source

On Mon, Mar 9, 2015 at 5:00 PM, sunil kalva sambarc...@gmail.com wrote:

 Can i configure different topic name in destination cluster, i mean can i
 have different topic names for source and destination cluster for
 mirroring. If yes how can i map source topic with destination topic name ?

 SunilKalva

 On Mon, Mar 9, 2015 at 6:41 AM, tao xiao xiaotao...@gmail.com wrote:

  Ctrl+c is clean shutdown. kill -9 is not
 
  On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu
 wrote:
 
   What does a clean shutdown of the MM entail? So far I've just been
  using
   Ctrl + C to send an interrupt to kill it.
  
  
   Alex
  
   On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
If auto.offset.reset is set to smallest, it does not mean the
 consumer
will always consume from the smallest. It means that if no previous
   offset
commit is found for this consumer group, then it will consume from
 the
smallest. So for mirror maker, you probably want to always use the
 same
consumer group id. This could be configured in the consumer config
 file
you pass into mirror maker.
Another thing about duplicate messages is that if mirror maker is
   shutdown
cleanly, next time when you start it again with same consumer group
 id,
there should be no duplicates. But if mirror maker shutdown
   uncleanly(e.g.
By a kill -9), then next time it starts up you might still have
  duplicate
messages after the last committed offsets.
   
Jiangjie (Becket) Qin
   
On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote:
   
Qin
Partition problem is solved by passing --new.producer true option
 in
command line,  but adding auto.offset.rese=smallest config, every
  time i
restart the Mirror tool it copies from starting ends up having lot
 of
duplicate messages in destination cluster.
Could you please tell me how do i configure to make sure that
   destination
cluster is always insync with source cluster.

SunilKalva

On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   
wrote:

 For data not showing up, you need to make sure mirror maker
 consumer
 auto.offset.reset is set to smallest, otherwise when you run
 mirror
maker
 for the first time, all the pre-existing messages wonÂčt be
 consumed.
 For partition sticking, can you verify if your messages are keyed
messages
 or not? If they are not keyed messages, can you check if you are
  using
old
 producer or new producer? For old producer, the default behavior
 is
 sticking to one partition for 10 min and then move to the next
partition.
 So if you wait for more than 10 min, you should see messages in
 two
 different partitions.

 Jiangjie (Becket) Qin

 On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote:

 And i also observed ,all the data is moving to one partition in
 destination
 cluster though i have multiple partitions for that topic in
 source
   and
 destination clusters.
 
 SunilKalva
 
 On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva 
 sambarc...@gmail.com
wrote:
 
  I ran kafka mirroring tool after producing data in source
  cluster,
and
  this is not copied to destination cluster. If i produce data
  after
 running
  tool those data are copied to destination cluster. Am i missing
 something ?
 
  --
  SunilKalva
 
 
 
 
 --
 SunilKalva




--
SunilKalva
   
   
  
 
 
 
  --
  Regards,
  Tao
 



 --
 SunilKalva




-- 
Regards,
Tao


Re: kafka mirroring ...!

2015-03-08 Thread tao xiao
Ctrl+c is clean shutdown. kill -9 is not

On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote:

 What does a clean shutdown of the MM entail? So far I've just been using
 Ctrl + C to send an interrupt to kill it.


 Alex

 On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  If auto.offset.reset is set to smallest, it does not mean the consumer
  will always consume from the smallest. It means that if no previous
 offset
  commit is found for this consumer group, then it will consume from the
  smallest. So for mirror maker, you probably want to always use the same
  consumer group id. This could be configured in the consumer config file
  you pass into mirror maker.
  Another thing about duplicate messages is that if mirror maker is
 shutdown
  cleanly, next time when you start it again with same consumer group id,
  there should be no duplicates. But if mirror maker shutdown
 uncleanly(e.g.
  By a kill -9), then next time it starts up you might still have duplicate
  messages after the last committed offsets.
 
  Jiangjie (Becket) Qin
 
  On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote:
 
  Qin
  Partition problem is solved by passing --new.producer true option in
  command line,  but adding auto.offset.rese=smallest config, every time i
  restart the Mirror tool it copies from starting ends up having lot of
  duplicate messages in destination cluster.
  Could you please tell me how do i configure to make sure that
 destination
  cluster is always insync with source cluster.
  
  SunilKalva
  
  On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
  
   For data not showing up, you need to make sure mirror maker consumer
   auto.offset.reset is set to smallest, otherwise when you run mirror
  maker
   for the first time, all the pre-existing messages wonÂčt be consumed.
   For partition sticking, can you verify if your messages are keyed
  messages
   or not? If they are not keyed messages, can you check if you are using
  old
   producer or new producer? For old producer, the default behavior is
   sticking to one partition for 10 min and then move to the next
  partition.
   So if you wait for more than 10 min, you should see messages in two
   different partitions.
  
   Jiangjie (Becket) Qin
  
   On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote:
  
   And i also observed ,all the data is moving to one partition in
   destination
   cluster though i have multiple partitions for that topic in source
 and
   destination clusters.
   
   SunilKalva
   
   On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com
  wrote:
   
I ran kafka mirroring tool after producing data in source cluster,
  and
this is not copied to destination cluster. If i produce data after
   running
tool those data are copied to destination cluster. Am i missing
   something ?
   
--
SunilKalva
   
   
   
   
   --
   SunilKalva
  
  
  
  
  --
  SunilKalva
 
 




-- 
Regards,
Tao


Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-07 Thread tao xiao
Actually I was going to report another bug that was exactly caused by
UncheckedOffsets.removeOffset
issue (remove offsets before it is added)

As the current project I am working on heavily relies on the
functionalities MM offers it would be good that if you put the fix to trunk
or gives me some advices how to fix the synchronization issue.

BTW can the synchronization issue be fixed by adding the unackedoffset to
the offset list before calling producer.send ?

On Sat, Mar 7, 2015 at 4:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Tao,

 Thanks a lot for finding the bug. We are actually rewriting the mirror
 maker in KAFKA-1997 with a much simplified solution using the newly added
 flush() call in new java producer.
 Mirror maker in current trunk is also missing one necessary
 synchronization - the UncheckedOffsets.removeOffset is not synchronized. I
 am hesitating whether to fix those problems in current trunk or just
 waiting for Kafka-1997 to be checked in. If you have a strong opinion
 about this, we can probably fix those 2 issues in the trunk. It should be
 a small patch but I just donÂčt want to people get distracted.

 Jiangjie (Becket) Qin

 On 3/6/15, 10:15 PM, tao xiao xiaotao...@gmail.com wrote:

 I think I worked out the root cause
 
 Line 593 in MirrorMaker.scala
 
 trace(Updating offset for %s to %d.format(topicPartition, offset))
 should
 be
 
 trace(Updating offset for %s to %d.format(topicPartition,
 offset.element))
 
 
 On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote:
 
  A bit more context: I turned on async in producer.properties
 
  On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi team,
 
  I am having java.util.IllegalFormatConversionException when running
  MirrorMaker with log level set to trace. The code is off latest trunk
 with
  commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
 
  The way I bring up is
 
  bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
  ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
  --producer.config
  ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
  --num.streams 1 --num.producers 1 --no.data.loss --whitelist
  mm-benchmark-test\\w* --offset.commit.interval.ms 1
  --queue.byte.size 1024
  and set the log level to trace in tools-log4j.properties
 
  here is the log snippet
 
  [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,211] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@130362d0,
  value=[B@434c4f70 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@54957b67,
  value=[B@21d8d293 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@1eed723b,
  value=[B@1acd590b with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@3ae8a936,
  value=[B@bd3671 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback
 on
  message for topic-partition mm-benchmark-test-0:
  (org.apache.kafka.clients.producer.internals.RecordBatch)
 
  java.util.IllegalFormatConversionException: d !=
  kafka.tools.MirrorMaker$UnackedOffset
 
  at
 java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)
 
  at
 java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)
 
  at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)
 
  at java.util.Formatter.format(Formatter.java:2488)
 
  at java.util.Formatter.format(Formatter.java:2423)
 
  at java.lang.String.format(String.java:2790)
 
  at
 
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266

How does num.consumer.fetchers get used

2015-03-06 Thread tao xiao
Hi team,

After reading the source code of AbstractFetcherManager I found out that
the usage of num.consumer.fetchers may not match what is described in the
Kafka doc. My interpretation of the Kafka doc is that  the number of
fetcher threads is controlled by the value of
 property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are
4 fetcher threads in total created after consumer is initialized.

But what I found from the source code tells me a different thing. Below
code is copied from AbstractFetcherManager

private def getFetcherId(topic: String, partitionId: Int) : Int = {

Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

  }


def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
BrokerAndInitialOffset]) {

mapLock synchronized {

  val partitionsPerFetcher = partitionAndOffsets.groupBy{
case(topicAndPartition,
brokerAndInitialOffset) =

BrokerAndFetcherId(brokerAndInitialOffset.broker,
getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}

  for ((brokerAndFetcherId, partitionAndOffsets) -
partitionsPerFetcher) {

var fetcherThread: AbstractFetcherThread = null

fetcherThreadMap.get(brokerAndFetcherId) match {

  case Some(f) = fetcherThread = f

  case None =

fetcherThread =
createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)

fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)

fetcherThread.start

}



fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
{ case (topicAndPartition, brokerAndInitOffset) =

  topicAndPartition - brokerAndInitOffset.initOffset

})

  }

}

 If I have one topic with one partition and num.consumer.fetchers set to 4
there is actually only one fetcher thread created not 4.
num.consumer.fetchers essentially set the max value of number of fetcher
threads not the actual number of fetcher threads. The actual number of
fetcher threads is controlled by this line of code
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

Is my assumption correct?

-- 
Regards,
Tao


Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
Hi team,

I am having java.util.IllegalFormatConversionException when running
MirrorMaker with log level set to trace. The code is off latest trunk with
commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

The way I bring up is

bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
--producer.config
~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
--num.streams 1 --num.producers 1 --no.data.loss --whitelist
mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size
1024
and set the log level to trace in tools-log4j.properties

here is the log snippet

[2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,211] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
value=[B@434c4f70 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
value=[B@21d8d293 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
value=[B@1acd590b with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

[2015-03-07 02:04:27,212] TRACE Sending record
ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
value=[B@bd3671 with callback
kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
mm-benchmark-test partition 0
(org.apache.kafka.clients.producer.KafkaProducer)

[2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
message for topic-partition mm-benchmark-test-0:
(org.apache.kafka.clients.producer.internals.RecordBatch)

java.util.IllegalFormatConversionException: d !=
kafka.tools.MirrorMaker$UnackedOffset

at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

at java.util.Formatter.format(Formatter.java:2488)

at java.util.Formatter.format(Formatter.java:2423)

at java.lang.String.format(String.java:2790)

at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

at scala.collection.immutable.StringOps.format(StringOps.scala:31)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

at kafka.utils.Logging$class.trace(Logging.scala:36)

at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

at
kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

at
org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

at
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

at
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

at java.lang.Thread.run(Thread.java:745)

[2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



-- 
Regards,
Tao


Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
A bit more context: I turned on async in producer.properties

On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I am having java.util.IllegalFormatConversionException when running
 MirrorMaker with log level set to trace. The code is off latest trunk with
 commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

 The way I bring up is

 bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
 --producer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
 --num.streams 1 --num.producers 1 --no.data.loss --whitelist
 mm-benchmark-test\\w* --offset.commit.interval.ms 1
 --queue.byte.size 1024
 and set the log level to trace in tools-log4j.properties

 here is the log snippet

 [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,211] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
 value=[B@434c4f70 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
 value=[B@21d8d293 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
 value=[B@1acd590b with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
 value=[B@bd3671 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
 message for topic-partition mm-benchmark-test-0:
 (org.apache.kafka.clients.producer.internals.RecordBatch)

 java.util.IllegalFormatConversionException: d !=
 kafka.tools.MirrorMaker$UnackedOffset

 at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

 at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

 at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

 at java.util.Formatter.format(Formatter.java:2488)

 at java.util.Formatter.format(Formatter.java:2423)

 at java.lang.String.format(String.java:2790)

 at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

 at scala.collection.immutable.StringOps.format(StringOps.scala:31)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at kafka.utils.Logging$class.trace(Logging.scala:36)

 at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

 at
 org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

 at
 org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

 at
 org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

 at
 org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

 at
 org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

 at java.lang.Thread.run(Thread.java:745)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



 --
 Regards,
 Tao




-- 
Regards,
Tao


Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-06 Thread tao xiao
I think I worked out the root cause

Line 593 in MirrorMaker.scala

trace(Updating offset for %s to %d.format(topicPartition, offset)) should
be

trace(Updating offset for %s to %d.format(topicPartition, offset.element))


On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote:

 A bit more context: I turned on async in producer.properties

 On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 I am having java.util.IllegalFormatConversionException when running
 MirrorMaker with log level set to trace. The code is off latest trunk with
 commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f

 The way I bring up is

 bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
 --producer.config
 ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
 --num.streams 1 --num.producers 1 --no.data.loss --whitelist
 mm-benchmark-test\\w* --offset.commit.interval.ms 1
 --queue.byte.size 1024
 and set the log level to trace in tools-log4j.properties

 here is the log snippet

 [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,211] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0,
 value=[B@434c4f70 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67,
 value=[B@21d8d293 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b,
 value=[B@1acd590b with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)

 [2015-03-07 02:04:27,212] TRACE Sending record
 ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936,
 value=[B@bd3671 with callback
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
 mm-benchmark-test partition 0
 (org.apache.kafka.clients.producer.KafkaProducer)

 [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on
 message for topic-partition mm-benchmark-test-0:
 (org.apache.kafka.clients.producer.internals.RecordBatch)

 java.util.IllegalFormatConversionException: d !=
 kafka.tools.MirrorMaker$UnackedOffset

 at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)

 at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)

 at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)

 at java.util.Formatter.format(Formatter.java:2488)

 at java.util.Formatter.format(Formatter.java:2423)

 at java.lang.String.format(String.java:2790)

 at
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)

 at scala.collection.immutable.StringOps.format(StringOps.scala:31)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592)

 at kafka.utils.Logging$class.trace(Logging.scala:36)

 at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)

 at
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592)

 at
 org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91)

 at
 org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267)

 at
 org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235)

 at
 org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)

 at
 org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312)

 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)

 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)

 at java.lang.Thread.run(Thread.java:745)

 [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message
 with value size 13 (kafka.tools.MirrorMaker$ProducerThread)



 --
 Regards,
 Tao




 --
 Regards,
 Tao

Re: Mirror maker end to end latency metric

2015-03-05 Thread tao xiao
Thanks Jon and Guangzhou for the info

On Fri, Mar 6, 2015 at 1:10 AM, Jon Bringhurst 
jbringhu...@linkedin.com.invalid wrote:

 Hey Tao,

 Slides 27-30 on
 http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015
  has
 a diagram to visually show that Guozhang is talking about.

 -Jon

 On Mar 5, 2015, at 9:03 AM, Guozhang Wang wangg...@gmail.com wrote:

 There is no end2end latency metric in MM, since such a metric requires
 timestamp info on the source / dest Kafka clusters. For example, at
 LinkedIn we add a timestamp in the message header, and let a separate
 consumer to fetch the message on both ends to measure the latency.

 Guozhang

 On Wed, Mar 4, 2015 at 11:07 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi team,

 Is there a built-in metric that can measure the end to end latency in MM?

 --
 Regards,
 Tao




 --
 -- Guozhang





-- 
Regards,
Tao


Re: Kafka DefaultPartitioner is not behaved as expected.

2015-03-05 Thread tao xiao
The reason you need to use a.getBytes is because the default serializer.class
is kafka.serializer.DefaultEncoder which takes byte[] as input. The way the
array returns hash code is not based on equality of the elements hence
every time a new byte array is created which is the case in your sample
code the hash code is going to be different.

If you really want to stick with the same partition for the key you'd
better use kafka.serializer.StringEncoder as the serializer.class.
This StringEncoder
takes string as input and as you know string always returns same hash code
if the value is the same.

On Fri, Mar 6, 2015 at 2:23 AM, Zijing Guo alter...@yahoo.com.invalid
wrote:

 And also there something that I think worth mentioning,when I
 call prod.send(KeyedMessage(foo, a, test message)), the data can't be
 delivered to the brokers, the only way to make it work is
 through:prod.send(KeyedMessage(foo, a.getBytes, test
 message.getBytes)). When I convert the data and key to bytes, the data is
 not going to the proper partitions.
 Thanks

  On Thursday, March 5, 2015 12:59 PM, Zijing Guo
 alter...@yahoo.com.INVALID wrote:


  Hi Guozhang,I'm using kafka 0.8.2.0
 Thanks

 On Thursday, March 5, 2015 12:57 PM, Guozhang Wang wangg...@gmail.com
 wrote:


  Zijing,

 Which version of Kafka client are you using?

 On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo alter...@yahoo.com.invalid
 wrote:

  Hi community,I have a 2 nodes test cluster with 2 zk instance and 2
 broker
  instance running and I'm experimenting kafka producer in a cluster
  environment. So I create a topic foo with 2 partitions and replication
  1.I create a async Producer without defining partition.class (so the
  partitioner will be the default one, which is
  kafka.producer.DefaultPartitioner and I verified.)
  Now since I know that there is 2 partitions for topic foo and I create
  1000 KeyedMessage with key = a
 val msgs = val msgs = (1 to 1000).map(e = KeyedMessage(foo,test
  message + e, a))prod.send(msgs)
 
  In theory, a.hashCode=97, 97 % 2 = 1. so I should expect all the
 message
  go to broker1. However,after I send the message,  from the kafka Web
  console, I can see that the data is evenly distributed around the 2
 brokers.
  Any help will be appreciated.Thanks
 
 




 --
 -- Guozhang









-- 
Regards,
Tao


Mirror maker end to end latency metric

2015-03-04 Thread tao xiao
Hi team,

Is there a built-in metric that can measure the end to end latency in MM?

-- 
Regards,
Tao


Re: Got negative offset lag after restarting brokers

2015-03-04 Thread tao xiao
Thanks guy. with unclean.leader.election.enable set to false the issue is
fixed

On Tue, Mar 3, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote:

 of course :)
 unclean.leader.election.enable

 On Mon, Mar 2, 2015 at 9:10 PM, tao xiao xiaotao...@gmail.com wrote:
  How do I achieve point 3? is there a config that I can set?
 
  On Tue, Mar 3, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
  wrote:
 
  The scenario you mentioned is equivalent to an unclean leader election.
  The following settings will make sure there is no data loss:
  1. Set replica factor to 3 and minimum ISR size to 2.
  2. When produce, use acks=-1 or acks=all
  3. Disable unclean leader election.
 
  1) and 2) Guarantees committed messages will be at least in to brokers.
  3) Means if a broker is not in ISR, it cannot be elected as a leader, so
  the log truncate as mentioned earlier will not happen.
 
  Jiangjie (Becket) Qin
 
  On 3/2/15, 7:16 PM, tao xiao xiaotao...@gmail.com wrote:
 
  Since I reused the same consumer group to consume the messages after
 step
  6
  data there was no data loss occurred. But if I create a new consumer
 group
  for sure the new consumer will suffer data loss.
  
  I am more concerning about if this is an acceptable behavior by Kafka
 that
  an out of sync broker can be elected as the leader for a partition. Is
  there any mechanism built around Kafka to ensure that only the in-sync
  broker can be chosen to be a leader? If no, what is the best practice
 to
  restart brokers if some of the replicas are out of sync?
  
  On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
  
   In this case you have data loss. In step 6, when broker 1 comes up,
 it
   becomes the leader and has log end offset 1000. When broker 0 comes
 up,
  it
   becomes follower and will truncate its log to 1000, i.e. 1000
 messages
   were lost. Next time when the consumer starts, its offset will be
 reset
  to
   either the smallest or the largest depending on the setting.
  
   Jiangjie (Becket) Qin
  
   On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote:
  
   Each topic has:  earliest and latest offsets (per partition)
   Each consumer group has a current offset (per topic, partition pair)
   
   I see -1 for the current offsets new consumer groups that haven't
 yet
   committed an offset. I think it means that the offsets for that
   consumer group are undefined.
   
   Is it possible you generated new consumer groups when you restarted
  your
   broker?
   
   
   
   
   On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com
 wrote:
Hi team,
   
I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I
 did
  some
tests on the two brokers to verify how leader got elected. Here
 are
  the
steps:
   
1. started 2 brokers
2. created a topic with partition=1 and replication-factor=2. Now
   brokers 1
was elected as leader
3. sent 1000 messages to the topic and consumed from a high level
   consumer
using zk as the offset storage.
4. shutdown broker 1 and now broker 0 was elected as leader
5. sent another 1000 messages to topic and consumed again
6. completely shutdown broker 0 and then started broker 1. now
  broker 1
became the leader
7. started broker 0 and ran ConsumerOffsetChecker which showed
  negative
   lag
(-1000 in my case)
   
I think this is because the consumed offset in zk was 2000 and
  logsize
retrieved from the leader (broker 1) which missed 1000 messages in
  step
   5
in this case was 1000 there -1000 = 1000 - 2000 was given.
   
Is this a bug or expected behavior?
   
--
Regards,
Tao
  
  
  
  
  --
  Regards,
  Tao
 
 
 
 
  --
  Regards,
  Tao




-- 
Regards,
Tao


Re: New subscriber offset

2015-03-03 Thread tao xiao
You can set the consumer config auto.offset.reset=largest
Ref: http://kafka.apache.org/documentation.html#consumerconfigs

On Tue, Mar 3, 2015 at 8:30 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Hi,

 We are using HighLevelConsumer and when a new subscription is added to the
 topic, the HighLevelConsumer for the same group starts from the start of
 the Kafka topic log.

 Is there anyway we could set the offset of the HighLevelConsumer to the end
 of the log instead? We don't want to move to LowLevelConsumer for this only
 case.

 Can we manually update the offset in the __consumer_offsets topic in 0.8.2?
 Please help.

 --
 Regards
 Vamsi Subhash




-- 
Regards,
Tao


Re: Got negative offset lag after restarting brokers

2015-03-02 Thread tao xiao
Since I reused the same consumer group to consume the messages after step 6
data there was no data loss occurred. But if I create a new consumer group
for sure the new consumer will suffer data loss.

I am more concerning about if this is an acceptable behavior by Kafka that
an out of sync broker can be elected as the leader for a partition. Is
there any mechanism built around Kafka to ensure that only the in-sync
broker can be chosen to be a leader? If no, what is the best practice to
restart brokers if some of the replicas are out of sync?

On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 In this case you have data loss. In step 6, when broker 1 comes up, it
 becomes the leader and has log end offset 1000. When broker 0 comes up, it
 becomes follower and will truncate its log to 1000, i.e. 1000 messages
 were lost. Next time when the consumer starts, its offset will be reset to
 either the smallest or the largest depending on the setting.

 Jiangjie (Becket) Qin

 On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote:

 Each topic has:  earliest and latest offsets (per partition)
 Each consumer group has a current offset (per topic, partition pair)
 
 I see -1 for the current offsets new consumer groups that haven't yet
 committed an offset. I think it means that the offsets for that
 consumer group are undefined.
 
 Is it possible you generated new consumer groups when you restarted your
 broker?
 
 
 
 
 On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com wrote:
  Hi team,
 
  I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some
  tests on the two brokers to verify how leader got elected. Here are the
  steps:
 
  1. started 2 brokers
  2. created a topic with partition=1 and replication-factor=2. Now
 brokers 1
  was elected as leader
  3. sent 1000 messages to the topic and consumed from a high level
 consumer
  using zk as the offset storage.
  4. shutdown broker 1 and now broker 0 was elected as leader
  5. sent another 1000 messages to topic and consumed again
  6. completely shutdown broker 0 and then started broker 1. now broker 1
  became the leader
  7. started broker 0 and ran ConsumerOffsetChecker which showed negative
 lag
  (-1000 in my case)
 
  I think this is because the consumed offset in zk was 2000 and logsize
  retrieved from the leader (broker 1) which missed 1000 messages in step
 5
  in this case was 1000 there -1000 = 1000 - 2000 was given.
 
  Is this a bug or expected behavior?
 
  --
  Regards,
  Tao




-- 
Regards,
Tao


Re: Got negative offset lag after restarting brokers

2015-03-02 Thread tao xiao
How do I achieve point 3? is there a config that I can set?

On Tue, Mar 3, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 The scenario you mentioned is equivalent to an unclean leader election.
 The following settings will make sure there is no data loss:
 1. Set replica factor to 3 and minimum ISR size to 2.
 2. When produce, use acks=-1 or acks=all
 3. Disable unclean leader election.

 1) and 2) Guarantees committed messages will be at least in to brokers.
 3) Means if a broker is not in ISR, it cannot be elected as a leader, so
 the log truncate as mentioned earlier will not happen.

 Jiangjie (Becket) Qin

 On 3/2/15, 7:16 PM, tao xiao xiaotao...@gmail.com wrote:

 Since I reused the same consumer group to consume the messages after step
 6
 data there was no data loss occurred. But if I create a new consumer group
 for sure the new consumer will suffer data loss.
 
 I am more concerning about if this is an acceptable behavior by Kafka that
 an out of sync broker can be elected as the leader for a partition. Is
 there any mechanism built around Kafka to ensure that only the in-sync
 broker can be chosen to be a leader? If no, what is the best practice to
 restart brokers if some of the replicas are out of sync?
 
 On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  In this case you have data loss. In step 6, when broker 1 comes up, it
  becomes the leader and has log end offset 1000. When broker 0 comes up,
 it
  becomes follower and will truncate its log to 1000, i.e. 1000 messages
  were lost. Next time when the consumer starts, its offset will be reset
 to
  either the smallest or the largest depending on the setting.
 
  Jiangjie (Becket) Qin
 
  On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote:
 
  Each topic has:  earliest and latest offsets (per partition)
  Each consumer group has a current offset (per topic, partition pair)
  
  I see -1 for the current offsets new consumer groups that haven't yet
  committed an offset. I think it means that the offsets for that
  consumer group are undefined.
  
  Is it possible you generated new consumer groups when you restarted
 your
  broker?
  
  
  
  
  On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com wrote:
   Hi team,
  
   I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did
 some
   tests on the two brokers to verify how leader got elected. Here are
 the
   steps:
  
   1. started 2 brokers
   2. created a topic with partition=1 and replication-factor=2. Now
  brokers 1
   was elected as leader
   3. sent 1000 messages to the topic and consumed from a high level
  consumer
   using zk as the offset storage.
   4. shutdown broker 1 and now broker 0 was elected as leader
   5. sent another 1000 messages to topic and consumed again
   6. completely shutdown broker 0 and then started broker 1. now
 broker 1
   became the leader
   7. started broker 0 and ran ConsumerOffsetChecker which showed
 negative
  lag
   (-1000 in my case)
  
   I think this is because the consumed offset in zk was 2000 and
 logsize
   retrieved from the leader (broker 1) which missed 1000 messages in
 step
  5
   in this case was 1000 there -1000 = 1000 - 2000 was given.
  
   Is this a bug or expected behavior?
  
   --
   Regards,
   Tao
 
 
 
 
 --
 Regards,
 Tao




-- 
Regards,
Tao


Got negative offset lag after restarting brokers

2015-03-02 Thread tao xiao
Hi team,

I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some
tests on the two brokers to verify how leader got elected. Here are the
steps:

1. started 2 brokers
2. created a topic with partition=1 and replication-factor=2. Now brokers 1
was elected as leader
3. sent 1000 messages to the topic and consumed from a high level consumer
using zk as the offset storage.
4. shutdown broker 1 and now broker 0 was elected as leader
5. sent another 1000 messages to topic and consumed again
6. completely shutdown broker 0 and then started broker 1. now broker 1
became the leader
7. started broker 0 and ran ConsumerOffsetChecker which showed negative lag
(-1000 in my case)

I think this is because the consumed offset in zk was 2000 and logsize
retrieved from the leader (broker 1) which missed 1000 messages in step 5
in this case was 1000 there -1000 = 1000 - 2000 was given.

Is this a bug or expected behavior?

-- 
Regards,
Tao


Re: How replicas catch up the leader

2015-02-28 Thread tao xiao
Thanks Harsha. In my case the replica doesn't catch up at all. the last log
date is 5 days ago. It seems the failed replica is excluded from
replication list. I am looking for a command that can add the replica back
to the ISR list or force it to start sync-up again

On Sat, Feb 28, 2015 at 4:27 PM, Harsha ka...@harsha.io wrote:

 you can increase num.replica.fetchers by default its 1 and also try
 increasing replica.fetch.max.bytes
 -Harsha

 On Fri, Feb 27, 2015, at 11:15 PM, tao xiao wrote:
  Hi team,
 
  I had a replica node that was shutdown improperly due to no disk space
  left. I managed to clean up the disk and restarted the replica but the
  replica since then never caught up the leader shown below
 
  Topic:test PartitionCount:1 ReplicationFactor:3 Configs:
 
  Topic: test Partition: 0 Leader: 5 Replicas: 1,5,6 Isr: 5,6
 
  broker 1 is the replica that failed before. Is there a way that I can
  force
  the replica to catch up the leader?
 
  --
  Regards,
  Tao




-- 
Regards,
Tao


How replicas catch up the leader

2015-02-27 Thread tao xiao
Hi team,

I had a replica node that was shutdown improperly due to no disk space
left. I managed to clean up the disk and restarted the replica but the
replica since then never caught up the leader shown below

Topic:test PartitionCount:1 ReplicationFactor:3 Configs:

Topic: test Partition: 0 Leader: 5 Replicas: 1,5,6 Isr: 5,6

broker 1 is the replica that failed before. Is there a way that I can force
the replica to catch up the leader?

-- 
Regards,
Tao


Re: kafka partitions api

2015-02-26 Thread tao xiao
Alex,

You can get partition from MessageAndMetadata as partition is exported via
constructor parameter

On Fri, Feb 27, 2015 at 2:12 PM, Alex Melville amelvi...@g.hmc.edu wrote:

 Tao and Gaurav,


 After looking through the source code in Kafka v8.2.0, I don't see any
 partition() function on the MessageAndMetadata object. Here's the class's
 source:


 package kafka.message

 import kafka.serializer.Decoder
 import kafka.utils.Utils

 case class MessageAndMetadata[K, V](topic: String, partition: Int,
 private val rawMessage: Message,
 offset: Long,
 keyDecoder: Decoder[K],
 valueDecoder: Decoder[V]) {

   /**
* Return the decoded message key and payload
*/
   def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else
 keyDecoder.fromBytes(Utils.readBytes(rawMessage.key))

   def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else
 valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload))



  -Alex M.


 On Thu, Feb 26, 2015 at 9:54 PM, Gaurav Agarwal gaurav130...@gmail.com
 wrote:

  that's fine to me , you can open a separate thread , But the original
  question when the consumerconnector got connected to a separate topic ,
  Whether KafkaStream will have all the information of the partitions for
  that corresponding topic , Please confirm
 
  Thanks
 
  On Fri, Feb 27, 2015 at 11:20 AM, Alex Melville amelvi...@g.hmc.edu
  wrote:
 
   I was going to make a separate email thread for this question but this
   thread's topic echoes what my own would have been.
  
  
   How can I query a broker or zookeeper for the number of partitions in a
   given topic? I'm trying to write a custom partitioner that sends a
  message
   to every partition within a topic, and so I need to know the total
 number
   of partitions before I call Producer.send().
  
  
   Alex
  
   On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Gaurav,
   
You can get the partition number the message belongs to via
MessageAndMetadata.partition()
   
On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote:
   
 The partition api is exposed to the consumer in 0.8.2.

 Thanks,

 Jun

 On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal 
   gaurav130...@gmail.com

 wrote:

  After retrieving a kafka stream or kafka message how to get the
  corresponding partition number to which it belongs ? I am using
  kafka
  version 0.8.1.
  More specifically kafka.consumer.KafkaStream and
  kafka.message.MessageAndMetaData classes, does not provide API to
 retrieve
  partition number. Are there any other API's to get the partition
number?
 
  IF there are multiple partitions of a topic ,Do i need to declare
   from
 java
  code how many partitions the topic contains or i can leave it
 topic
  Kafkastream will take the partition information from kafka broker
  at
  runtime.?
 

   
   
   
--
Regards,
Tao
   
  
 




-- 
Regards,
Tao


Re: kafka partitions api

2015-02-26 Thread tao xiao
Gaurav,

You can get the partition number the message belongs to via
MessageAndMetadata.partition()

On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote:

 The partition api is exposed to the consumer in 0.8.2.

 Thanks,

 Jun

 On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com
 wrote:

  After retrieving a kafka stream or kafka message how to get the
  corresponding partition number to which it belongs ? I am using kafka
  version 0.8.1.
  More specifically kafka.consumer.KafkaStream and
  kafka.message.MessageAndMetaData classes, does not provide API to
 retrieve
  partition number. Are there any other API's to get the partition number?
 
  IF there are multiple partitions of a topic ,Do i need to declare from
 java
  code how many partitions the topic contains or i can leave it topic
  Kafkastream will take the partition information from kafka broker at
  runtime.?
 




-- 
Regards,
Tao


Re: Default MirrorMaker not copying over from source to target

2015-02-19 Thread tao xiao
Looks like you only have 4 messages in your topic and no more messages got
sent

2015-02-19 20:09:34,661] DEBUG initial fetch offset of consolemm:0: fetched
offset = 4: consumed offset = 4 is 4 (kafka.consumer.PartitionTopicInfo

You can try sending more messages to topic or give the MM a different
consumer group id and set auto.offset.reset=smallest

On Friday, February 20, 2015, Alex Melville amelvi...@g.hmc.edu wrote:

 Tao,


 I updated the mirrorconsumer.properties config file as you suggested, and
 upped the MM's log level to DEBUG. I have the output of the DEBUG logger
 here in this pastebin, if you could take a minute to look for anything in
 its contents that would indicate a problem that would be extremely helpful.
 Note that my servers hostnames are of the form ad-010X or ba-0X where X is
 some integer between 1 and 4.

 http://pastebin.com/rBsxx15A

 When I run the mirrormaker and then spin up a console consumer to read from
 the source cluster, I get 0 messages consumed.


 Alex

 On Sun, Feb 15, 2015 at 3:00 AM, tao xiao xiaotao...@gmail.com
 javascript:; wrote:

  Alex,
 
  Are you sure you have data continually being sent to the topic in source
  cluster after you bring up MM? By default auto.offset.reset=largest in MM
  consumer config which means MM only fetches the largest offset if the
  consumer group has no initial offset in zookeeper.
 
  You can have MM print more log by changing the log level in
  config/tools-log4j.properties
 
  On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville amelvi...@g.hmc.edu
 javascript:;
  wrote:
 
   Hi Kafka'ers,
  
  
   I am trying to get the Mirrormaker working with two separate clusters,
  one
   as the source and the other as the target. The topic I'm trying to copy
   over exists on both the source and target clusters. Here are the
 relevant
   entries in my consumer and producer properties files, which I'm
  specifying
   the command I run to start the MM:
  
   *mirrorconsumer.properties:*
   zookeeper.connect=ad-0104:2181
   zookeeper.connection.timeout.ms=6000
   group.id=test-consumer-group
  
  
   *mirrorproducer.properties:*
   metadata.broker.list=ba-02:9092,ba-03:9092
   producer.type=sync
   compression.codec=none
   serializer.class=kafka.serializer.DefaultEncoder
  
  
   Then I run the following command:
   bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
../config/mirrorconsumer.properties --producer.config
   ../config/mirrorproducer.properties --whitelist consolemm
  
   so consolemm is the topic I'm trying to copy over. I've created
 consolemm
   and have used to console-consumer to verify that there are messages in
  the
   topic.
  
   When I run this command... nothing happens. The process keeps running
 and
   prints nothing to the Terminal. If I look in the output of the
 zookeeper
  on
   the source cluster I get only the following:
  
   [2015-02-15 00:34:06,102] INFO Accepted socket connection from /
   10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory)
   [2015-02-15 00:34:06,104] INFO Client attempting to establish new
 session
   at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer)
   [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033
 with
   negotiated timeout 6000 for client /10.7.162.75:42819
   (org.apache.zookeeper.server.ZooKeeperServer)
  
  
   and when I look at the output of one of the brokers on the source
  cluster I
   get:
  
   [2015-02-15 00:32:14,382] INFO Closing socket connection to /
 10.7.162.75
  .
   (kafka.network.Processor)
  
   and there is no output on the zookeeper on the target cluster.
  
  
  
   Any advice on what is causing MM to not properly copy over data to the
   target cluster would be extremely helpful.
  
   -Alex
  
 
 
 
  --
  Regards,
  Tao
 



-- 
Regards,
Tao


Re: consumer lag metric

2015-02-17 Thread tao xiao
Thanks Todd. that will work

On Tue, Feb 17, 2015 at 10:31 PM, Todd Palino tpal...@gmail.com wrote:

 In order to do that, you'll need to run it and parse the output, and then
 emit it to your metrics system of choice. This is essentially what I do - I
 have a monitoring application which runs every minute and pulls the offsets
 for a select set of topics and consumers, and then packages up the metrics
 and sends them to our internal system.

 It's not ideal. We're working on a script to calculate lag efficiently for
 all consumers who commit offsets to Kafka, rather than a select set.

 -Todd


 On Mon, Feb 16, 2015 at 12:27 AM, tao xiao xiaotao...@gmail.com wrote:

  Thank you Todd for your detailed explanation. Currently I export all
  metrics to graphite using the reporter configuration. is there a way I
 can
  do similar thing with offset checker?
 
  On Mon, Feb 16, 2015 at 4:21 PM, Todd Palino tpal...@gmail.com wrote:
 
   The reason for this is the mechanic by which each of the lags are
   calculated. MaxLag (and the FetcherLagMetric) are calculated by the
   consumer itself using the difference between the offset it knows it is
  at,
   and the offset that the broker has as the end of the partition. The
  offset
   checker, however, uses the last offset that the consumer committed.
   Depending on your configuration, this is somewhere behind where the
   consumer actually is. For example, if your commit interval is set to 10
   minutes, the number used by the offset checker can be up to 10 minutes
   behind where it actually is.
  
   So while MaxLag may be more up to date at any given time, it's actually
   less accurate. Because MaxLag relies on the consumer to report it, if
 the
   consumer breaks, you will not see an accurate lag number. This is why
  when
   we are checking consumer lag, we use an external process that uses the
   committed consumer offsets. This allows us to catch a broken consumer,
 as
   well as an active consumer that is just falling behind.
  
   -Todd
  
  
   On Fri, Feb 13, 2015 at 9:34 PM, tao xiao xiaotao...@gmail.com
 wrote:
  
Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics
 are
always
much smaller than the lag shown in offset checker. any reason?
   
On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
   
 There are FetcherLagMetrics that you can take a look at. However,
 it
 is probably easiest to just monitor MaxLag as that reports the
  maximum
 of all the lag metrics.

 On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
  Hi team,
 
  Is there a metric that shows the consumer lag of a particular
   consumer
  group? similar to what offset checker provides
 
  --
  Regards,
  Tao


   
   
--
Regards,
Tao
   
  
 
 
 
  --
  Regards,
  Tao
 




-- 
Regards,
Tao


Re: consumer lag metric

2015-02-16 Thread tao xiao
Thank you Todd for your detailed explanation. Currently I export all
metrics to graphite using the reporter configuration. is there a way I can
do similar thing with offset checker?

On Mon, Feb 16, 2015 at 4:21 PM, Todd Palino tpal...@gmail.com wrote:

 The reason for this is the mechanic by which each of the lags are
 calculated. MaxLag (and the FetcherLagMetric) are calculated by the
 consumer itself using the difference between the offset it knows it is at,
 and the offset that the broker has as the end of the partition. The offset
 checker, however, uses the last offset that the consumer committed.
 Depending on your configuration, this is somewhere behind where the
 consumer actually is. For example, if your commit interval is set to 10
 minutes, the number used by the offset checker can be up to 10 minutes
 behind where it actually is.

 So while MaxLag may be more up to date at any given time, it's actually
 less accurate. Because MaxLag relies on the consumer to report it, if the
 consumer breaks, you will not see an accurate lag number. This is why when
 we are checking consumer lag, we use an external process that uses the
 committed consumer offsets. This allows us to catch a broken consumer, as
 well as an active consumer that is just falling behind.

 -Todd


 On Fri, Feb 13, 2015 at 9:34 PM, tao xiao xiaotao...@gmail.com wrote:

  Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are
  always
  much smaller than the lag shown in offset checker. any reason?
 
  On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
   There are FetcherLagMetrics that you can take a look at. However, it
   is probably easiest to just monitor MaxLag as that reports the maximum
   of all the lag metrics.
  
   On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
Hi team,
   
Is there a metric that shows the consumer lag of a particular
 consumer
group? similar to what offset checker provides
   
--
Regards,
Tao
  
  
 
 
  --
  Regards,
  Tao
 




-- 
Regards,
Tao


Re: API to get the partition number

2015-02-15 Thread tao xiao
You can get the partition number and offset of the message by
MessageAndMetadata.partition() and MessageAndMetadata.offset().

To your scenario you can turn off auto commit auto.commit.enable=false and
then commit by yourself after finishing message consumption.

On Mon, Feb 16, 2015 at 1:40 PM, Arunkumar Srambikkal (asrambik) 
asram...@cisco.com wrote:

 Hi,


 Is there a way to get the current partition number and current offset,
 when using the *high level consumer* in 0.8.2?

 I went through the previous messages and in the previous version I think
 there are none.

 The reason we want to do this, is that I  plan to have a consumer without
 the default commit of offsets, to avoid the scenario of consumers going
 down before updating the accurate offset.

 Rgds
 Arun




-- 
Regards,
Tao


Re: consumer lag metric

2015-02-14 Thread tao xiao
Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are always
much smaller than the lag shown in offset checker. any reason?

On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com wrote:

 There are FetcherLagMetrics that you can take a look at. However, it
 is probably easiest to just monitor MaxLag as that reports the maximum
 of all the lag metrics.

 On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote:
  Hi team,
 
  Is there a metric that shows the consumer lag of a particular consumer
  group? similar to what offset checker provides
 
  --
  Regards,
  Tao




-- 
Regards,
Tao


Re: offset migration from kafka to zookeeper

2015-02-14 Thread tao xiao
Thanks Jiangjie for your help

On Sat, Feb 14, 2015 at 5:59 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Thanks for looking into that!

 On Fri, Feb 13, 2015 at 05:31:39AM +, Jiangjie Qin wrote:
  I think this is the offset checker bug.
  The offset checker will
  1. first check if the offset exists in offset topic on broker or not.
  2. If it is on broker then it will just return that offset.
  3. Otherwise it goes to zookeeper.
 
  So the problem you saw was actually following this logic.
  After dual commit, offset topic already had the offsets for this consumer
  and topic.
  Then you switched to zookeeper commit.
  Because the offset topic has the offsets already, offset checker will use
  that and skip checking zookeeper. So the offset will not change anymore
  because you are no longer committing to offset topic on broker, while
  offset checker always use that offset.
 
  On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote:
 
  I used the one shipped with 0.8.2. It is pretty straightforward to
  reproduce the issue.
  
  Here are the steps to reproduce:
  1. I have a consumer using high level consumer API with initial settings
  offsets.storage=kafka and dual.commit.enabled=false.
  2. After consuming messages for a while shutdown the consumer and change
  setting dual.commit.enabled=true
  3. bounce the consumer and run for while. The lag looks good
  4. change setting offsets.storage=zookeeper and bounce the consumer.
  Starting from now the lag remain unchanged
  
  On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
   That is weird. Are you by any chance running an older version of the
   offset checker? Is this straightforward to reproduce?
  
   On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
Joel,
   
No, the metric was not increasing. It was 0 all the time.
   
On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com
   wrote:
   
 Actually I meant to say check that is not increasing.

 On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
  Possibly a bug - can you also look at the MaxLag mbean in the
   consumer
  to verify that the maxlag is zero?
 
  On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
   Hi Joel,
  
   When I set dual.commit.enabled=true the count value of both
   metrics got
   increased. After I set offsets.storage=zookeeper only
 ZooKeeperCommitsPerSec
   changed but not KafkaCommitsPerSec. I think this is expected
 as
   kafka
   offset storage was turned off.
  
   But when I looked up the consumer lag via
 kafka.tools.ConsumerOffsetChecker
   the lag still remained unchanged.
  
   I scanned through the source code of ConsumerOffsetChecker it
   doesn't
   check the offset in zk unless offsetFetchResponse returns
  NoOffset.
 Since
   the consumer used kafka as the offset storage before I don't
  think
   offsetFetchResponse would return NoOffset
  
offsetFetchResponse.requestInfo.foreach { case
  (topicAndPartition,
   offsetAndMetadata) =
  
   if (offsetAndMetadata ==
  OffsetMetadataAndError.NoOffset) {
  
 val topicDirs = new ZKGroupTopicDirs(group,
 topicAndPartition.
   topic)
  
 // this group may not have migrated off zookeeper
 for
   offsets
   storage (we don't expose the dual-commit option in this tool
  
 // (meaning the lag may be off until all the
 consumers
   in the
   group have the same setting for offsets storage)
  
 try {
  
   val offset = ZkUtils.readData(zkClient,
 topicDirs.consumerOffsetDir
   + /%d.format(topicAndPartition.partition))._1.toLong
  
   offsetMap.put(topicAndPartition, offset)
  
 } catch {
  
   case z: ZkNoNodeException =
  
  
  if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
  
   offsetMap.put(topicAndPartition,-1)
  
 else
  
   throw z
  
 }
  
   }
  
   else if (offsetAndMetadata.error ==
  ErrorMapping.NoError)
  
 offsetMap.put(topicAndPartition,
   offsetAndMetadata.offset)
  
   else {
  
 println(Could not fetch offset for %s due to
   %s..format(
   topicAndPartition,
   ErrorMapping.exceptionFor(offsetAndMetadata.error)))
  
   }
  
 }
  
   On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
  jjkosh...@gmail.com
 wrote:
  
There are mbeans named KafkaCommitsPerSec and
   ZooKeeperCommitsPerSec
 -
can you look those up and see what they report?
   
On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
 Hi team,

 I

offset migration from kafka to zookeeper

2015-02-12 Thread tao xiao
Hi team,

I was trying to migrate my consumer offset from kafka to zookeeper.

Here is the original settings of my consumer

props.put(offsets.storage, kafka);

props.put(dual.commit.enabled, false);
Here is the steps

1. set dual.commit.enabled=true
2. restart my consumer and monitor offset lag with
kafka.tools.ConsumerOffsetChecker
3. set offsets.storage=zookeeper
4. restart my consumer and monitor offset lag with
kafka.tools.ConsumerOffsetChecker

After step 4 my consumer was able to continually consume data from topic
but the offset lag remained unchanged. Did I do anything wrong?

-- 
Regards,
Tao


Re: offset migration from kafka to zookeeper

2015-02-12 Thread tao xiao
Thanks for the explanation. It there a way that I can wipe out the offset
stored in kafka so that the checker can continue to work again?

On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I think this is the offset checker bug.
 The offset checker will
 1. first check if the offset exists in offset topic on broker or not.
 2. If it is on broker then it will just return that offset.
 3. Otherwise it goes to zookeeper.

 So the problem you saw was actually following this logic.
 After dual commit, offset topic already had the offsets for this consumer
 and topic.
 Then you switched to zookeeper commit.
 Because the offset topic has the offsets already, offset checker will use
 that and skip checking zookeeper. So the offset will not change anymore
 because you are no longer committing to offset topic on broker, while
 offset checker always use that offset.

 On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote:

 I used the one shipped with 0.8.2. It is pretty straightforward to
 reproduce the issue.
 
 Here are the steps to reproduce:
 1. I have a consumer using high level consumer API with initial settings
 offsets.storage=kafka and dual.commit.enabled=false.
 2. After consuming messages for a while shutdown the consumer and change
 setting dual.commit.enabled=true
 3. bounce the consumer and run for while. The lag looks good
 4. change setting offsets.storage=zookeeper and bounce the consumer.
 Starting from now the lag remain unchanged
 
 On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  That is weird. Are you by any chance running an older version of the
  offset checker? Is this straightforward to reproduce?
 
  On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
   Joel,
  
   No, the metric was not increasing. It was 0 all the time.
  
   On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Actually I meant to say check that is not increasing.
   
On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
 Possibly a bug - can you also look at the MaxLag mbean in the
  consumer
 to verify that the maxlag is zero?

 On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
  Hi Joel,
 
  When I set dual.commit.enabled=true the count value of both
  metrics got
  increased. After I set offsets.storage=zookeeper only
ZooKeeperCommitsPerSec
  changed but not KafkaCommitsPerSec. I think this is expected as
  kafka
  offset storage was turned off.
 
  But when I looked up the consumer lag via
kafka.tools.ConsumerOffsetChecker
  the lag still remained unchanged.
 
  I scanned through the source code of ConsumerOffsetChecker it
  doesn't
  check the offset in zk unless offsetFetchResponse returns
 NoOffset.
Since
  the consumer used kafka as the offset storage before I don't
 think
  offsetFetchResponse would return NoOffset
 
   offsetFetchResponse.requestInfo.foreach { case
 (topicAndPartition,
  offsetAndMetadata) =
 
  if (offsetAndMetadata ==
 OffsetMetadataAndError.NoOffset) {
 
val topicDirs = new ZKGroupTopicDirs(group,
topicAndPartition.
  topic)
 
// this group may not have migrated off zookeeper for
  offsets
  storage (we don't expose the dual-commit option in this tool
 
// (meaning the lag may be off until all the consumers
  in the
  group have the same setting for offsets storage)
 
try {
 
  val offset = ZkUtils.readData(zkClient,
topicDirs.consumerOffsetDir
  + /%d.format(topicAndPartition.partition))._1.toLong
 
  offsetMap.put(topicAndPartition, offset)
 
} catch {
 
  case z: ZkNoNodeException =
 
 
 if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
 
  offsetMap.put(topicAndPartition,-1)
 
else
 
  throw z
 
}
 
  }
 
  else if (offsetAndMetadata.error ==
 ErrorMapping.NoError)
 
offsetMap.put(topicAndPartition,
  offsetAndMetadata.offset)
 
  else {
 
println(Could not fetch offset for %s due to
  %s..format(
  topicAndPartition,
  ErrorMapping.exceptionFor(offsetAndMetadata.error)))
 
  }
 
}
 
  On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
 jjkosh...@gmail.com
wrote:
 
   There are mbeans named KafkaCommitsPerSec and
  ZooKeeperCommitsPerSec
-
   can you look those up and see what they report?
  
   On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
Hi team,
   
I was trying to migrate my consumer offset from kafka to
  zookeeper.
   
Here is the original settings of my consumer

Re: createMessageStreams vs createMessageStreamsByFilter

2015-02-10 Thread tao xiao
Thank you Guozhang for your detailed explanation. In your example
createMessageStreamsByFilter(*C = 3)  since threads are shared among
topics there may be situation where all 3 threads threads get stuck with
topic AC e.g. topic is empty which will be holding the connecting threads
(setting consumer.timeout.ms=-1) hence there is no thread to serve topic
BC. do you think this situation will happen?

On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote:

 I was not clear before .. for createMessageStreamsByFilter each matched
 topic will have num-threads, but shared: i.e. there will be totally
 num-threads created, but each thread will be responsible for fetching all
 matched topics.

 A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
 partitions.

 With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will
 be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;

 With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be
 created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
 respectively.

 Guozhang

 On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote:

  Guozhang,
 
  Do you mean that each regex matched topic owns number of threads that get
  passed in to createMessageStreamsByFilter ? For example in below code If
 I
  have 3 matched topics each of which has 2 partitions then I should have
 3 *
  2 = 6 threads in total with each topic owning 2 threads.
 
  TopicFilter filter = new Whitelist(.*);
 
   int threadTotal = 2;
 
   ListKafkaStreambyte[], byte[] streams = connector
  .createMessageStreamsByFilter(filter, threadTotal);
 
 
  But what I observed from the log is different
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
  test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
  partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
  List(test1234dd5_localhost-1423585444070-82f23758-0,
  test1234dd5_localhost-1423585444070-82f23758-1)
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
  test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
  partition 1
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
  test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
  partition 0
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
  test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
  partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
  List(test1234dd5_localhost-1423585444070-82f23758-0,
  test1234dd5_localhost-1423585444070-82f23758-1)
 
  2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
  partitions consumed by consumer thread
  test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
  test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
  partition 0
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
  test1234dd5_localhost-1423585444070-82f23758 rebalancing the following
  partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
  List(test1234dd5_localhost-1423585444070-82f23758-0,
  test1234dd5_localhost-1423585444070-82f23758-1)
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
  test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
  partition 1
 
  2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
  test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
  partition 0
 
 
  As you can see from the log there are only 2 threads created and shared
  among 3 topics. With this setting I think the parallelism is degraded
 and a
  slow topic may impact other topics' consumption performance. Any
 thoughts?
 
  On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang wangg...@gmail.com
  wrote:
 
   createMessageStreams is used for consuming from specific topic(s),
 where
   you can put a map of [topic-name, num-threads] as its input parameters;
  
   createMessageStreamsByFilter is used for consuming from wildcard
 topics,
   where you can put a (regex, num-threads) as its input parameters, and
 for
   each regex matched topic num-threads will be created.
  
   The difference between these two are not really for throughput /
 latency,
   but rather consumption semantics.
  
   Guozhang
  
  
   On Tue, Feb 10, 2015 at 3:02 AM, tao xiao xiaotao...@gmail.com
 wrote:
  
Hi team,
   
I am comparing the differences between
ConsumerConnector.createMessageStreams
and ConsumerConnector.createMessageStreamsByFilter. My understanding
 is
that createMessageStreams creates x number of threads (x is the
 number
  of
threads passed in to the method) dedicated to the specified topic
while createMessageStreamsByFilter creates x number of threads shared
  by
topics specified by TopicFilter. Is it correct?
   
If this is the case I assume

Re: createMessageStreams vs createMessageStreamsByFilter

2015-02-10 Thread tao xiao
Do you know when the new consumer API will be publicly available?

On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote:

 Yes, it can get stuck. For example, AC and BC are processed by two
 different processes and AC processors gets stuck, hence AC messages will
 fill up in the consumer's buffer and eventually prevents the fetcher thread
 to put more data into it; the fetcher thread will be blocked on that and
 not be able to fetch BC.

 This issue has been addressed in the new consumer client, which is
 single-threaded with non-blocking APIs.

 Guozhang

 On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote:

  Thank you Guozhang for your detailed explanation. In your example
  createMessageStreamsByFilter(*C = 3)  since threads are shared among
  topics there may be situation where all 3 threads threads get stuck with
  topic AC e.g. topic is empty which will be holding the connecting threads
  (setting consumer.timeout.ms=-1) hence there is no thread to serve topic
  BC. do you think this situation will happen?
 
  On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   I was not clear before .. for createMessageStreamsByFilter each matched
   topic will have num-threads, but shared: i.e. there will be totally
   num-threads created, but each thread will be responsible for fetching
 all
   matched topics.
  
   A more concrete example: say you have topic AC: 3 partitions, topic
 BC: 6
   partitions.
  
   With createMessageStreams(AC = 3, BC = 2) a total of 5 threads
 will
   be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
 respectively;
  
   With createMessageStreamsByFilter(*C = 3) a total of 3 threads will
 be
   created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
   respectively.
  
   Guozhang
  
   On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com
 wrote:
  
Guozhang,
   
Do you mean that each regex matched topic owns number of threads that
  get
passed in to createMessageStreamsByFilter ? For example in below code
  If
   I
have 3 matched topics each of which has 2 partitions then I should
 have
   3 *
2 = 6 threads in total with each topic owning 2 threads.
   
TopicFilter filter = new Whitelist(.*);
   
 int threadTotal = 2;
   
 ListKafkaStreambyte[], byte[] streams = connector
.createMessageStreamsByFilter(filter, threadTotal);
   
   
But what I observed from the log is different
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the
 following
partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
partition 1
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the
 following
partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)
   
2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
partitions consumed by consumer thread
test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
test1234dd5_localhost-1423585444070-82f23758 rebalancing the
 following
partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers:
List(test1234dd5_localhost-1423585444070-82f23758-0,
test1234dd5_localhost-1423585444070-82f23758-1)
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
partition 1
   
2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
partition 0
   
   
As you can see from the log there are only 2 threads created and
 shared
among 3 topics. With this setting I think the parallelism is degraded
   and a
slow topic may impact other topics' consumption performance. Any
   thoughts?
   
On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang wangg...@gmail.com
wrote:
   
 createMessageStreams is used for consuming from specific topic(s),
   where
 you can put a map of [topic-name, num-threads] as its input
  parameters;

 createMessageStreamsByFilter is used for consuming from wildcard

createMessageStreams vs createMessageStreamsByFilter

2015-02-10 Thread tao xiao
Hi team,

I am comparing the differences between
ConsumerConnector.createMessageStreams
and ConsumerConnector.createMessageStreamsByFilter. My understanding is
that createMessageStreams creates x number of threads (x is the number of
threads passed in to the method) dedicated to the specified topic
while createMessageStreamsByFilter creates x number of threads shared by
topics specified by TopicFilter. Is it correct?

If this is the case I assume createMessageStreams is the preferred way to
create streams for each topic if I have high throughput and low latency
demands. is my assumption correct?

-- 
Regards,
Tao


Re: Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread tao xiao
It happens every time I shutdown the connector. It doesn't block the
shutdown process though

On Tue, Feb 10, 2015 at 1:09 AM, Guozhang Wang wangg...@gmail.com wrote:

 Is this exception transient or consistent and blocking the shutdown
 process?

 On Mon, Feb 9, 2015 at 3:07 AM, tao xiao xiaotao...@gmail.com wrote:

  Hi team,
 
  I got java.nio.channels.ClosedByInterruptException when
  closing ConsumerConnector using kafka 0.8.2
 
  Here is the exception
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [test12345_localhost], ZKConsumerConnector shutting down
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherManager-1423479848796] Stopping leader finder thread
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [test12345_localhost], Shutting down
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [test12345_localhost], Shutdown completed
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherManager-1423479848796] Stopping all fetchers
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [test12345_localhost], Stopped
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherThread-test12345_localhost], Shutting down
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due to
  socket error: java.nio.channels.ClosedByInterruptException
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherThread-test12345_localhost], Stopped
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherThread-test12345_localhost], Shutdown completed
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [ConsumerFetcherManager-1423479848796] All connections stopped
 
  2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 -
 Terminate
  ZkClient event thread.
 
  2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
  0x14b6dd8fcf80011 closed
 
  2015-02-09 19:04:19 INFO
 org.apache.zookeeper.ClientCnxn$EventThread:512 -
  EventThread shut down
 
  2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
  [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms
 
 
  --
  Regards,
  Tao
 



 --
 -- Guozhang




-- 
Regards,
Tao


Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread tao xiao
Hi team,

I got java.nio.channels.ClosedByInterruptException when
closing ConsumerConnector using kafka 0.8.2

Here is the exception

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], ZKConsumerConnector shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] Stopping leader finder thread

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Shutdown completed

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] Stopping all fetchers

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Stopped

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Stopped

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Shutdown completed

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] All connections stopped

2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 - Terminate
ZkClient event thread.

2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
0x14b6dd8fcf80011 closed

2015-02-09 19:04:19 INFO  org.apache.zookeeper.ClientCnxn$EventThread:512 -
EventThread shut down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms


-- 
Regards,
Tao


Is auto.commit.enable still applicable when setting offsets.storage to kafka

2015-02-09 Thread tao xiao
Hi team,

If I set offsets.storage=kafka can I still use auto.commit.enable to turn
off auto commit and auto.commit.interval.ms to control commit interval ? As
the documentation mentions that the above two properties are used to
control offset to zookeeper.

-- 
Regards,
Tao


Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition

2015-02-08 Thread tao xiao
Alex,

I got similar error before due to incorrect network binding of my laptop's
wireless interface. You can try with setting advertised.host.name=kafka's
server hostname in the server.properties and run it again.

On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote:

 Howdy all,

 I recently upgraded to Kafka 0.8.2.0 and am trying to verify that
 everything still works as expected. I spin up two brokers, one zk instance,
 and then create a topic using

 kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme
 --partitions 2 --replication-factor 1

 Then I run --describe to check if the partitions have leaders. I get


 kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme

 Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs:
 Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0
 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1


 Finally, I run the console producer

 kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme

 I get the following warning

 [2015-02-08 00:36:24,244] WARN Property topic is not valid
 (kafka.utils.VerifiableProperties)

 and then it waits for console input. When I try to send a message I get the
 following list of error messages

 [2015-02-08 00:37:04,735] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,751] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:04,859] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,863] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:04,968] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,974] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,079] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,084] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,189] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme
 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events
 (kafka.producer.async.ProducerSendThread)
 

Got IOException when writing metrics to csv file

2015-02-04 Thread tao xiao
Hi team,

I was running the mirror maker off the trunk code and got IOException when
configuring the mirror maker to use KafkaCSVMetricsReporter as the metric
reporter

Here is the exception I got

java.io.IOException: Unable to create /tmp/csv1/BytesPerSec.csv

at
com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141)

at
com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257)

at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)

at
com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156)

at
com.yammer.metrics.reporting.CsvReporter.processTimer(CsvReporter.java:212)

at
com.yammer.metrics.reporting.CsvReporter.processTimer(CsvReporter.java:22)

at com.yammer.metrics.core.Timer.processWith(Timer.java:214)

at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


Here is the configuration I put in the consumer.properties

zookeeper.connect=127.0.0.1:2181

# timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=100

#consumer group id

group.id=kafka-topic

auto.offset.reset=smallest

#consumer timeout

#consumer.timeout.ms=5000

# Metrics

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/csv1

kafka.csv.metrics.reporter.enabled=true


I debug the code and discovered that the issue was caused different metrics
sharing the same metric name. I diff the trunk code with 0.8.1
on ConsumerTopicStats.scala

from 0.8.1

val byteRate = newMeter(metricId + BytesPerSec,  bytes,
TimeUnit.SECONDS)

from trunk

val byteRate = newMeter(BytesPerSec, bytes, TimeUnit.SECONDS, tags)


trunk removes the metricId from the name which results in same metric name
BytesPerSec used by multiple metrics.


Is this a bug or by intention?
-- 
Regards,
Tao


Re: Kafka producer perf script throw java.io.IOException

2015-02-04 Thread tao xiao
Hi,

In order to get it work you can turn off csv-reporter.

On Thu, Feb 5, 2015 at 1:06 PM, Xinyi Su xiny...@gmail.com wrote:

 Hi,

 Today I updated Kafka cluster from 0.8.2-beta to 0.8.2.0 and run kafka
 producer performance test.

 The test cannot continue because of some exceptions thrown which does not
 occur at 0.8.2-beta. My perf library is kafka-perf_2.9.2-0.8.0.jar which is
 the latest version on maven repository.

 -bash-4.1$ bin/kafka-producer-perf-test.sh   --broker-list broker list
 --topics PerfTopic22 --sync --initial-message-id 1 --messages 20
 --csv-reporter-enabled --metrics-dir /tmp/PerfTopic22_1
 --message-send-gap-ms 20 --request-num-acks -1 --batch-size 1

 java.io.IOException: Unable to create
 /tmp/PerfTopic22_1/ProducerRequestSize.csv
 at

 com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141)
 at

 com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257)
 at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)
 at
 com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156)
 at

 com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:194)
 at

 com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:22)
 at com.yammer.metrics.core.Histogram.processWith(Histogram.java:231)
 at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
 at

 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
 at

 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)




-- 
Regards,
Tao


Got NPE when running the latest mirror maker that is in trunk

2015-01-23 Thread tao xiao
Hi team,

I got NPE when running the latest mirror maker that is in trunk

[2015-01-23 18:55:20,229] INFO
[kafkatopic-1_LM-SHC-00950667-1422010513674-cb0bb562], exception during
rebalance  (kafka.consumer.ZookeeperConsumerConnector)

java.lang.NullPointerException

at
kafka.tools.MirrorMaker$InternalRebalanceListener.beforeReleasingPartitions(MirrorMaker.scala:631)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:674)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:616)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616)

at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615)

at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:931)

at
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:965)

at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163)

at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:273)

at kafka.tools.MirrorMaker.main(MirrorMaker.scala)


Here is the command I run


bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
~/Downloads/kafka/kafka_2.9.2-0.8.1.1/config/consumer.properties
--producer.config
~/Downloads/kafka/kafka_2.9.2-0.8.1.1/config/producer.properties
--num.streams 2 --num.producers 2 --no.data.loss --whitelist .*

-- 
Regards,
Tao


New mirror maker consumer.config question

2015-01-22 Thread tao xiao
Hi,

I discovered that the new mirror maker implementation in trunk now only
accept one consumer.config property instead of a list of them which means
we can only supply one source per mirror maker process. Is it a reason for
it? If I have multiple source kafka clusters do I need to setup multiple
mirror maker processes?

-- 
Regards,
Tao


Inter Mirror maker processes offset sync

2015-01-21 Thread tao xiao
Hi all,

I have two mirror maker processes running on two different machines
fetching messages from same topic from one data center to another data
center. These two processes are assigned to the same consumer group. If I
want no data loss or data duplication even when one of the mirror maker
processes die I need to find a way to inform another process the offset the
last successfully sent message by the dead process but I know the offset
stored in zookeeper is the offset the last consumed message by mirror maker
not the offset last successfully sent message, is there a way to configure
mirror maker to achieve exact one message semantic?

-- 
Regards,
Tao


<    1   2