Replica manager exception in broker
Hi team, One of the brokers keeps getting below exception. [2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error when processing fetch request for partition [test1,0] offset 206845418 from consumer with correlation id 93748260. Possible cause: Request for offset 206845418 but we only have log segments in the range 207804287 to 207804287. (kafka.server.ReplicaManager) This is the follower broker of topic test1 and ISR of that topic has only 1 broker left right now. Just wanted to know what cause this issue and how I can prevent it? -- Regards, Tao
Re: Mirrormaker stops consuming
It is possible that the message produced rate is slower than the consumed message rate which results in insufficient space left for the internal data channel mirror maker uses to buffer the data from consumer to producer. You can check histogram MirrorMaker-DataChannel-Size to see if any space left. On Fri, May 22, 2015 at 11:35 PM, Rajasekar Elango rela...@salesforce.com wrote: We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker that randomly stops consuming. We had to restart the mirrormaker process to resolve the problem. This problem has occurred several times in past two weeks. Here is what I found in analysis: When this problem happens: Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of messages in mirrormaker log are ProducerSendThread producing to destination. No errors or exceptions. Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows mirrormaker consumer offset stops incrementing. Mirrormaker consumer MinFetch rate jmx metric drops to zero. ConsumerTopicMetric.BytesPerSec drops to zero. So its mirrormaker consumer should have stopped accepting new data. Can some one provide input on how to trouble shoot this problem further and identify root cause? Got Thread dump before restarting, it looks ok to me, no blocked thread. Here is thread dump output 2015-05-21 18:59:09 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed mode): Attach Listener daemon prio=10 tid=0x7f7248002000 nid=0x2d53 waiting on condition [0x] java.lang.Thread.State: RUNNABLE Locked ownable synchronizers: - None ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2 prio=10 tid=0x7f71e407e000 nid=0x3425 waiting on condition [0x7f72833f2000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00042cd15cc8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) Locked ownable synchronizers: - 0x00042ea62eb0 (a java.util.concurrent.locks.ReentrantLock$NonfairSync) ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3 prio=10 tid=0x7f71e407b000 nid=0x3424 waiting on condition [0x7f7281f99000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x00042ccece80 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110) at
Re: Replica manager exception in broker
Hi Joel, The error offset 206845418 didn't change. The only thing that changed is the correlation id and it was incrementing. The broker is the follower and I saw similar error messages for other topics the broker was a follower for. As indicated by the log this is a request coming from a consumer not follower. One thing I don't quite understand is that consumer requests for the topic (test1) should go to the leader not follower so why there were consumer requests connecting to the broker? The other issue I noticed is that the replica fetcher threads from the follower didn't fetch any data at all from leader the log file size in follower didn't grow for several hours On Sat, May 23, 2015 at 12:40 AM, Joel Koshy jjkosh...@gmail.com wrote: When you say keeps getting below exception I'm assuming that the error offset (206845418) keeps changing - right? We saw a similar issue in the past and it turned out to be due to a NIC issue - i.e., it negotiated at a low speed. So the replica fetcher couldn't keep up with the leader. i.e., while it caught up within the first segment the leader's log would roll (i.e., the segment would get deleted) and we would see the out of range error. Is this broker a follower for other partitions? Do those partitions show up in these error message? On Fri, May 22, 2015 at 03:11:09PM +0800, tao xiao wrote: Hi team, One of the brokers keeps getting below exception. [2015-05-21 23:56:52,687] ERROR [Replica Manager on Broker 15]: Error when processing fetch request for partition [test1,0] offset 206845418 from consumer with correlation id 93748260. Possible cause: Request for offset 206845418 but we only have log segments in the range 207804287 to 207804287. (kafka.server.ReplicaManager) This is the follower broker of topic test1 and ISR of that topic has only 1 broker left right now. Just wanted to know what cause this issue and how I can prevent it? -- Regards, Tao -- Regards, Tao
Re: unclean.leader.election.enable question
Thank you Mayuresh for the quick reply. If my producer has acks=all set would the producer get callback indicating the missing 2000 messages unsuccessful delivery assuming new Java producer is used On Wednesday, May 20, 2015, gharatmayures...@gmail.com wrote: This is not unclean leader election since the follower is still in ISR. Yes we will loose those 2000 messages. Mayuresh Sent from my iPhone On May 20, 2015, at 8:31 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi team, I know that if a broker is behind the leader by no more than replica.lag.max.messages the broker is considered in sync with the leader. Considering a situation where I have unclean.leader.election.enable=true set in brokers and the follower is now 2000 messages behind (the default replica.lag.max.messages is 4000), will the follower be elected as the leader if the current leader is down? if yes do we lose that 2000 messages? -- Regards, Tao -- Regards, Tao
unclean.leader.election.enable question
Hi team, I know that if a broker is behind the leader by no more than replica.lag.max.messages the broker is considered in sync with the leader. Considering a situation where I have unclean.leader.election.enable=true set in brokers and the follower is now 2000 messages behind (the default replica.lag.max.messages is 4000), will the follower be elected as the leader if the current leader is down? if yes do we lose that 2000 messages? -- Regards, Tao
Re: Experiences testing new producer performance across multiple threads/producer counts
Garry, Do you mind to share the source code that you did for the profiling? On Sun, May 17, 2015 at 4:59 PM, Garry Turkington g.turking...@improvedigital.com wrote: Hi Guozhang/Jay/Becket, Thanks for the responses. Regarding my point on performance dropping when the number of partitions was increased, that surprised me too as on another cluster I had done just this to help with the issue of lots of ISR churn and it had been a straight win. I mentioned in my last mail that I had simplified the code to generate test messages with the effect that it greatly reduced the CPU load per thread. After doing this the performance on the higher partition-count topic was consistent with the lower partition count one and showed no degredation. So the sender threads were becoming CPU bound, I'm assuming possibly due to the additional locks involved with more partitions but that needs validation. I've been running my clients with acks=1, linger.ms floating between 0 and 1 because I want to convince myself of it making a difference but so far I've not really seen it and similar to Jay's experiences settled on 64K for batch.size because I just didn't see any benefit of anything beyond that and even the jump from 32K wasn't proved beneficial. For this particular application I've already hit the needed performance (around 700K/sec at peak) but my workload can be quite a sawtooth moving from peak to idle and back again. So peak becomes the new norm and understanding the head-room in the current setup and how to grow beyond that is important. I've had a few more test boxes put on the same 10GB network as the cluster in question so I'll re-visit this and do deeper profiling over the next week and will revert here with findings. Regards Garry -Original Message- From: Guozhang Wang [mailto:wangg...@gmail.com] Sent: 14 May 2015 18:57 To: users@kafka.apache.org Subject: Re: Experiences testing new producer performance across multiple threads/producer counts Regarding the issue that adding more partitions kill the performance: I would suspect it maybe due to not-sufficient batching. Note that in the new producer batching is done per-partition, and if linger.ms setting low, partition data may not be batched enough before they got sent to the brokers. Also since the new producer will drain all partitions that belongs to the same broker, when one of them hits either linger time or batch size, when you only have one or a few brokers this will further exaggerate the not-sufficient-batching issue. So monitoring on average batch size would be a good idea. Guozhang On Wed, May 13, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Garry, Super interesting. We honestly never did a ton of performance tuning on the producer. I checked the profiles early on in development and we fixed a few issues that popped up in deployment, but I don't think anyone has done a really scientific look. If you (or anyone else) want to dive into things I suspect it could be improved. Becket is exactly right. There are two possible bottlenecks you can hit in the producer--the single background sender thread and the per-partition lock. You can check utilization on the background thread with jvisualvm (it's named something like kafka-producer-network-thread). The locking is fairly hard to improve. It's a little surprising that adding partitions caused a large decrease in performance. Generally this is only the case if you override the flush settings on the broker to force fsync more frequently. The ISR issues under heavy load are probably fixable, the issue is discussed a bit here: http://blog.confluent.io/2015/04/07/hands-free-kafka-replication-a-les son-in-operational-simplicity/ The producer settings that may matter for performance are: acks batch.size (though beyond 32k I didn't see much improvement) linger.ms (setting = 1 may help a bit) send.buffer.bytes (maybe, but probably not) Cheers, -Jay On Wed, May 13, 2015 at 3:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks for sharing this, Garry. I actually did similar tests before but unfortunately lost the test data because my laptop rebooted and I forgot to save the dataĆ Anyway, several things to verify: 1. Remember KafkaProducer holds lock per partition. So if you have only one partition in the target topic and many application threads. Lock contention could be an issue. 2. It matters that how frequent the sender thread wake up and runs. You can take a look at the following sensors to further verify whether the sender thread really become a bottleneck or not: Select-rate Io-wait-time-ns-avg Io-time-ns-avg 3. Batch size matters, so take a look at the sensor batch-size-avg and see if the average batch size makes sense or not. Looking forward to your further profiling. My thinking is that
Re: Getting NotLeaderForPartitionException in kafka broker
i will try to reproduce this problem later this week. Bouncing the broker fixed the issue but the issue surfaced again after a period of time. A little more context about this is that the cluster was deployed to VMs and I discovered that the issue appeared whenever CPU wait time was extremely high like 90% CPU time spent on I/O wait. I am more interesting in understanding under what circumstance this issue would happen so that I can take appropriate actions On Fri, May 15, 2015 at 8:04 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: If you can reproduce this problem steadily, once you see this issue, can you grep the controller log for topic partition in question and see if there is anything interesting? Thanks. Jiangjie (Becket) Qin On 5/14/15, 3:43 AM, tao xiao xiaotao...@gmail.com wrote: Yes, it does exist in ZK and the node that had the NotLeaderForPartitionException is the leader of the topic On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Does this topic exist in Zookeeper? On 5/12/15, 11:35 PM, tao xiao xiaotao...@gmail.com wrote: Hi, Any updates on this issue? I keep seeing this issue happening over and over again On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have a 12 nodes cluster that has 800 topics and each of which has only 1 partition. I observed that one of the node keeps generating NotLeaderForPartitionException that causes the node to be unresponsive to all requests. Below is the exception [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for partition [topic1,0] to broker 12:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) All other nodes in the cluster generate lots of replication error too as shown below due to unresponsiveness of above node. [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on partition [topic1,0] failed due to Leader not local for partition [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager) Any suggestion why the node runs into the unstable stage and any configuration I can set to prevent this? I use kafka 0.8.2.1 And here is the server.properties broker.id=5 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mnt/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false zookeeper.connect=ip:2181 zookeeper.connection.timeout.ms=6000 unclean.leader.election.enable=false delete.topic.enable=true default.replication.factor=3 num.replica.fetchers=3 delete.topic.enable=true kafka.metrics.reporters=report.KafkaMetricsCollector straas.hubble.conf.file=/etc/kafka/report.conf -- Regards, Tao -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: Getting NotLeaderForPartitionException in kafka broker
Yes, it does exist in ZK and the node that had the NotLeaderForPartitionException is the leader of the topic On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Does this topic exist in Zookeeper? On 5/12/15, 11:35 PM, tao xiao xiaotao...@gmail.com wrote: Hi, Any updates on this issue? I keep seeing this issue happening over and over again On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have a 12 nodes cluster that has 800 topics and each of which has only 1 partition. I observed that one of the node keeps generating NotLeaderForPartitionException that causes the node to be unresponsive to all requests. Below is the exception [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for partition [topic1,0] to broker 12:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) All other nodes in the cluster generate lots of replication error too as shown below due to unresponsiveness of above node. [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on partition [topic1,0] failed due to Leader not local for partition [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager) Any suggestion why the node runs into the unstable stage and any configuration I can set to prevent this? I use kafka 0.8.2.1 And here is the server.properties broker.id=5 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mnt/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false zookeeper.connect=ip:2181 zookeeper.connection.timeout.ms=6000 unclean.leader.election.enable=false delete.topic.enable=true default.replication.factor=3 num.replica.fetchers=3 delete.topic.enable=true kafka.metrics.reporters=report.KafkaMetricsCollector straas.hubble.conf.file=/etc/kafka/report.conf -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: Getting NotLeaderForPartitionException in kafka broker
Hi, Any updates on this issue? I keep seeing this issue happening over and over again On Thu, May 7, 2015 at 7:28 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have a 12 nodes cluster that has 800 topics and each of which has only 1 partition. I observed that one of the node keeps generating NotLeaderForPartitionException that causes the node to be unresponsive to all requests. Below is the exception [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for partition [topic1,0] to broker 12:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) All other nodes in the cluster generate lots of replication error too as shown below due to unresponsiveness of above node. [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on partition [topic1,0] failed due to Leader not local for partition [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager) Any suggestion why the node runs into the unstable stage and any configuration I can set to prevent this? I use kafka 0.8.2.1 And here is the server.properties broker.id=5 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mnt/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false zookeeper.connect=ip:2181 zookeeper.connection.timeout.ms=6000 unclean.leader.election.enable=false delete.topic.enable=true default.replication.factor=3 num.replica.fetchers=3 delete.topic.enable=true kafka.metrics.reporters=report.KafkaMetricsCollector straas.hubble.conf.file=/etc/kafka/report.conf -- Regards, Tao -- Regards, Tao
Getting NotLeaderForPartitionException in kafka broker
Hi team, I have a 12 nodes cluster that has 800 topics and each of which has only 1 partition. I observed that one of the node keeps generating NotLeaderForPartitionException that causes the node to be unresponsive to all requests. Below is the exception [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for partition [topic1,0] to broker 12:class kafka.common.NotLeaderForPartitionException (kafka.server.ReplicaFetcherThread) All other nodes in the cluster generate lots of replication error too as shown below due to unresponsiveness of above node. [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch request with correlation id 3630911 from client ReplicaFetcherThread-0-1 on partition [topic1,0] failed due to Leader not local for partition [cg22_user.item_attr_info.lcr,0] on broker 1 (kafka.server.ReplicaManager) Any suggestion why the node runs into the unstable stage and any configuration I can set to prevent this? I use kafka 0.8.2.1 And here is the server.properties broker.id=5 port=9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/mnt/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=1 log.segment.bytes=1073741824 log.retention.check.interval.ms=30 log.cleaner.enable=false zookeeper.connect=ip:2181 zookeeper.connection.timeout.ms=6000 unclean.leader.election.enable=false delete.topic.enable=true default.replication.factor=3 num.replica.fetchers=3 delete.topic.enable=true kafka.metrics.reporters=report.KafkaMetricsCollector straas.hubble.conf.file=/etc/kafka/report.conf -- Regards, Tao
Re: MultiThreaded HLConsumer Exits before events are all consumed
The log suggests that the shutdown method were still called Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down Please ensure no consumer.shutdown(); and executor.shutdown(); are called during the course of your program On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote: Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt
Re: MultiThreaded HLConsumer Exits before events are all consumed
example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); while (it.hasNext()){ System.out.println(Thread + m_threadNumber + : + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Shutting down Thread: + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println(Timed out waiting for consumer threads to shut down, exiting uncleanly); } } catch (InterruptedException e) { System.out.println(Interrupted during shutdown, exiting uncleanly); } } public void run(int a_numThreads) { MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId);
Getting java.lang.IllegalMonitorStateException in mirror maker when building fetch request
Hi team, I observed java.lang.IllegalMonitorStateException thrown from AbstractFetcherThread in mirror maker when it is trying to build the fetchrequst. Below is the error [2015-04-23 16:16:02,049] ERROR [ConsumerFetcherThread-group_id_localhost-1429830778627-4519368f-0-7], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) I believe this is due to partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) being called while not lock is acquired. below code should fix the issue inLock(partitionMapLock) { partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } Should I file a jira ticket and submit the patch? I use the latest version of mirror maker in trunk. -- Regards, Tao
Re: Got NPE during partition rebalancing in high level consumer
Guozhang, No, I don't think the patch of KAFKA-2056 would fix this problem. The NPE is thrown at the line that is called before the fix executes. But I do notice that the code in trunk did fix the issue by ensuring the size of map returned from ctx.consumersForTopic is 0. So the code in trunk is safe. On Wed, Apr 15, 2015 at 3:45 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Tao, Do you think the solution to KAFKA-2056 will resolve this issue? It will be included in 0.8.3 release. Guozhang On Wed, Apr 15, 2015 at 2:21 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, I discovered an issue that when a high level consumer with roundrobin assignment strategy consumes a topic that hasn't been created on broker a NPE exception is thrown during partition rebalancing phase. I use Kafka 0.8.2.1 Here is the step to reproduce: 1. create a high level consumer with roundrobin 2. use connector.createMessageStreamsByFilter to create a message stream in the consumer to a topic that yet to be created on broker Below is the exception. 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667f_localhost], Committing all offsets after clearing the fetcher queues 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667f_localhost], Releasing partition ownership 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:76 - [test12345667f_localhost], exception during rebalance java.lang.NullPointerException at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210) at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:939) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) at com.ebay.kafka.demo.Consumer.main(Consumer.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) -- Regards, Tao -- -- Guozhang -- Regards, Tao
Got NPE during partition rebalancing in high level consumer
Hi team, I discovered an issue that when a high level consumer with roundrobin assignment strategy consumes a topic that hasn't been created on broker a NPE exception is thrown during partition rebalancing phase. I use Kafka 0.8.2.1 Here is the step to reproduce: 1. create a high level consumer with roundrobin 2. use connector.createMessageStreamsByFilter to create a message stream in the consumer to a topic that yet to be created on broker Below is the exception. 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667f_localhost], Committing all offsets after clearing the fetcher queues 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:68 - [test12345667f_localhost], Releasing partition ownership 2015-04-15 14:16:46 INFO kafka.utils.Logging$class:76 - [test12345667f_localhost], exception during rebalance java.lang.NullPointerException at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:210) at scala.collection.mutable.HashTable$$anon$1.next(HashTable.scala:202) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.IterableLike$class.head(IterableLike.scala:91) at scala.collection.AbstractIterable.head(Iterable.scala:54) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:75) at kafka.consumer.RoundRobinAssignor.assign(PartitionAssignor.scala:69) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:660) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:608) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:905) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:939) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:160) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:105) at com.ebay.kafka.demo.Consumer.main(Consumer.java:61) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) -- Regards, Tao
Re: Kafka server relocation
num.consumer.fetchers means the max number of fetcher threads that can be spawned. it doesn't necessarily mean you can get as many fetcher threads as you specify. To me the metrics are suggesting a very slow consumption rate only 18.21 bytes/minute. Here is the benchmark Linkedin does http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines You should check if 18.21 bytes/minute is the max throughput you can get on your machine with bin/kafka-consumer-perf-test.sh if this is case you definitely need to tune it On Mon, Apr 13, 2015 at 12:43 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi Xiao, i have finally got JMX monitoring enabled for my kafka nodes in test envrionment and here is what i observed. i was monitoring mbeans under kafka.consumer domain of JVM running Kafka Mirror Maker process. = AllTopicsBytes === 18.21 bytes/minute FetchRequestRateAndTimeMs === 9.69 Request/min and 99th Percentile is 104.13ms. == Interesting thing is that I have specified num.consumer.fetchers=200 in my consumer property file but i can see only 8 threads of type : kafka.consumer:name=KafkaMaker1-ConsumerFetcherThread-KafkaMaker1_zkhost-1428952277321-5e044226-138-1-host_brokerhostname-port_9092-FetchRequestRateAndTimeMs,type=FetchRequestAndResponseMetrics Could this be the issue? note, my JVM is set to 1GB and only 30MB is utilized most of the time. Regards, Nitin Kumar Sharma. On Wed, Apr 8, 2015 at 10:48 PM, tao xiao xiaotao...@gmail.com wrote: Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to check if the consumer has problem processing messages On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks, but can you please tell which metrics could highlight the factor causing slow data migration by MirrorMaker? Regards, Nitin Kumar Sharma. On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com wrote: You may need to look into the consumer metrics and producer metrics to identify the root cause. metrics in kafka.consumer and kafka.producer categories will help you find out the problems. This link gives instruction how to read the metrics http://kafka.apache.org/documentation.html#monitoring On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, sorry for late response. ... i have been able to fix the issue .. problem was in my approach. I got confused between my source and target system while defining consumer producer property file .. it is fixed now Now new issue.. the rate at which data is migrated is very very slow... i mean it took 5 min to copy only 15Kb.. :( .. here are the property for producer and consumer.. there is no network latency between Source and Destination clusters as such. Producer ### metadata.broker.list=broker1IP:9092,broker2IP:9092 serializer.class=kafka.serializer.DefaultEncoder auto.create.topics.enable=true request.required.acks=1 request.required.acks=1 producer.type=async batch.num.messages=3000 queue.buffering.max.ms=5000 queue.buffering.max.messages=10 queue.enqueue.timeout.ms=-1 socket.send.buffer.bytes=5282880 # Consumer ### zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181 group.id=KafkaMaker auto.create.topics.enable=true socket.receive.buffer.bytes=5243880 zookeeper.connection.timeout.ms=100 num.consumer.fetchers=20 fetch.message.max.bytes=5243880 Regards, Nitin Kumar Sharma. On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com wrote: Can you attach your mirror maker log? On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: i tried with auto.offset.reset=smallest, but still not working.. there is data in my source cluster Regards, Nitin Kumar Sharma. On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com wrote: Do you have data sending to *testtopic? *By default mirror maker only consumes data being sent after it taps into the topic. you need to keep sending data to the topic after mirror maker connection is established. If you want to change the behavior you can set auto.offset.reset=smallest so that any new mirror maker coming to the topic will start from the smallest offset On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks
Re: Kafka server relocation
how about the consumer lag of mirror maker? On Mon, Apr 13, 2015 at 1:33 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: i just tested that too and below is the stats.. it is clear that with kafka-consumer-perf-test.sh, i am able to get a high throughput. around 44.0213 MB/sec. Seriously some configuration needs to be tweaked in MirrorMaker configuration for speedy processing... Can you think of something ? start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2015-04-13 20:29:29:070, 2015-04-13 20:29:34:596, 1048576, 23.1552, 44.0213, 16683, 31716.7300 Regards, Nitin Kumar Sharma. On Mon, Apr 13, 2015 at 3:51 PM, tao xiao xiaotao...@gmail.com wrote: num.consumer.fetchers means the max number of fetcher threads that can be spawned. it doesn't necessarily mean you can get as many fetcher threads as you specify. To me the metrics are suggesting a very slow consumption rate only 18.21 bytes/minute. Here is the benchmark Linkedin does http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines You should check if 18.21 bytes/minute is the max throughput you can get on your machine with bin/kafka-consumer-perf-test.sh if this is case you definitely need to tune it On Mon, Apr 13, 2015 at 12:43 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi Xiao, i have finally got JMX monitoring enabled for my kafka nodes in test envrionment and here is what i observed. i was monitoring mbeans under kafka.consumer domain of JVM running Kafka Mirror Maker process. = AllTopicsBytes === 18.21 bytes/minute FetchRequestRateAndTimeMs === 9.69 Request/min and 99th Percentile is 104.13ms. == Interesting thing is that I have specified num.consumer.fetchers=200 in my consumer property file but i can see only 8 threads of type : kafka.consumer:name=KafkaMaker1-ConsumerFetcherThread-KafkaMaker1_zkhost-1428952277321-5e044226-138-1-host_brokerhostname-port_9092-FetchRequestRateAndTimeMs,type=FetchRequestAndResponseMetrics Could this be the issue? note, my JVM is set to 1GB and only 30MB is utilized most of the time. Regards, Nitin Kumar Sharma. On Wed, Apr 8, 2015 at 10:48 PM, tao xiao xiaotao...@gmail.com wrote: Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to check if the consumer has problem processing messages On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks, but can you please tell which metrics could highlight the factor causing slow data migration by MirrorMaker? Regards, Nitin Kumar Sharma. On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com wrote: You may need to look into the consumer metrics and producer metrics to identify the root cause. metrics in kafka.consumer and kafka.producer categories will help you find out the problems. This link gives instruction how to read the metrics http://kafka.apache.org/documentation.html#monitoring On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, sorry for late response. ... i have been able to fix the issue .. problem was in my approach. I got confused between my source and target system while defining consumer producer property file .. it is fixed now Now new issue.. the rate at which data is migrated is very very slow... i mean it took 5 min to copy only 15Kb.. :( .. here are the property for producer and consumer.. there is no network latency between Source and Destination clusters as such. Producer ### metadata.broker.list=broker1IP:9092,broker2IP:9092 serializer.class=kafka.serializer.DefaultEncoder auto.create.topics.enable=true request.required.acks=1 request.required.acks=1 producer.type=async batch.num.messages=3000 queue.buffering.max.ms=5000 queue.buffering.max.messages=10 queue.enqueue.timeout.ms=-1 socket.send.buffer.bytes=5282880 # Consumer ### zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181 group.id=KafkaMaker auto.create.topics.enable=true socket.receive.buffer.bytes=5243880 zookeeper.connection.timeout.ms=100 num.consumer.fetchers=20 fetch.message.max.bytes=5243880 Regards, Nitin Kumar Sharma. On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com wrote: Can you attach your mirror maker
Re: Kafka server relocation
Metrics like Bytepersec, FetchRequestRateAndTimeMs can help you to check if the consumer has problem processing messages On Thu, Apr 9, 2015 at 2:40 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks, but can you please tell which metrics could highlight the factor causing slow data migration by MirrorMaker? Regards, Nitin Kumar Sharma. On Tue, Apr 7, 2015 at 10:10 PM, tao xiao xiaotao...@gmail.com wrote: You may need to look into the consumer metrics and producer metrics to identify the root cause. metrics in kafka.consumer and kafka.producer categories will help you find out the problems. This link gives instruction how to read the metrics http://kafka.apache.org/documentation.html#monitoring On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, sorry for late response. ... i have been able to fix the issue .. problem was in my approach. I got confused between my source and target system while defining consumer producer property file .. it is fixed now Now new issue.. the rate at which data is migrated is very very slow... i mean it took 5 min to copy only 15Kb.. :( .. here are the property for producer and consumer.. there is no network latency between Source and Destination clusters as such. Producer ### metadata.broker.list=broker1IP:9092,broker2IP:9092 serializer.class=kafka.serializer.DefaultEncoder auto.create.topics.enable=true request.required.acks=1 request.required.acks=1 producer.type=async batch.num.messages=3000 queue.buffering.max.ms=5000 queue.buffering.max.messages=10 queue.enqueue.timeout.ms=-1 socket.send.buffer.bytes=5282880 # Consumer ### zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181 group.id=KafkaMaker auto.create.topics.enable=true socket.receive.buffer.bytes=5243880 zookeeper.connection.timeout.ms=100 num.consumer.fetchers=20 fetch.message.max.bytes=5243880 Regards, Nitin Kumar Sharma. On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com wrote: Can you attach your mirror maker log? On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: i tried with auto.offset.reset=smallest, but still not working.. there is data in my source cluster Regards, Nitin Kumar Sharma. On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com wrote: Do you have data sending to *testtopic? *By default mirror maker only consumes data being sent after it taps into the topic. you need to keep sending data to the topic after mirror maker connection is established. If you want to change the behavior you can set auto.offset.reset=smallest so that any new mirror maker coming to the topic will start from the smallest offset On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Xiao I tried MirrorMaker option in my test environment but failed. I am not able to see the log getting copied to destination cluster. I see in the log of MirrorMaker process that connection is successfully established between source and destination cluster but still not sure what i causing the problem Env. Setup == I). Source Cluster (Qenv02) -- i have 2 broker(Qenv02kf01,Qenv02kf02) and 3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03). Destination Clustern (Qenv05) -- i have 2 broker (Qenv05kf01,Qenv05kf02) and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03). II). i have kept consumer and producer properties file in one of the source kafka broker config folder. III).i have executed following command from the same kafka broker to start the process.. log are attached : /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker -consumer.config /app/kafka/config/consumer1.properties --num.streams=2 --producer.config /app/kafka/config/producer1.properties --whitelist testtopic IV). I tried Consumer offset tracker tool also, while Mirror Maker running . I tried by launching second session of same broker where mirror maker is running. I got error message that *NoNode for /consumers/KafkaMaker/offsets/testtopic/0* .Complete log attached. Regards, Nitin Kumar Sharma. On Thu, Mar 26, 2015 at 11:24 AM, tao xiao xiaotao...@gmail.com wrote: Both consumer-1 and consumer-2 are properties of source clusters mirror maker transfers data from. Mirror maker is designed to be able
Re: Kafka server relocation
You may need to look into the consumer metrics and producer metrics to identify the root cause. metrics in kafka.consumer and kafka.producer categories will help you find out the problems. This link gives instruction how to read the metrics http://kafka.apache.org/documentation.html#monitoring On Wed, Apr 8, 2015 at 3:39 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, sorry for late response. ... i have been able to fix the issue .. problem was in my approach. I got confused between my source and target system while defining consumer producer property file .. it is fixed now Now new issue.. the rate at which data is migrated is very very slow... i mean it took 5 min to copy only 15Kb.. :( .. here are the property for producer and consumer.. there is no network latency between Source and Destination clusters as such. Producer ### metadata.broker.list=broker1IP:9092,broker2IP:9092 serializer.class=kafka.serializer.DefaultEncoder auto.create.topics.enable=true request.required.acks=1 request.required.acks=1 producer.type=async batch.num.messages=3000 queue.buffering.max.ms=5000 queue.buffering.max.messages=10 queue.enqueue.timeout.ms=-1 socket.send.buffer.bytes=5282880 # Consumer ### zookeeper.connect=zk1hostname:2181,zk2hostname:2181,zk3hostname:2181 group.id=KafkaMaker auto.create.topics.enable=true socket.receive.buffer.bytes=5243880 zookeeper.connection.timeout.ms=100 num.consumer.fetchers=20 fetch.message.max.bytes=5243880 Regards, Nitin Kumar Sharma. On Tue, Mar 31, 2015 at 12:36 PM, tao xiao xiaotao...@gmail.com wrote: Can you attach your mirror maker log? On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: i tried with auto.offset.reset=smallest, but still not working.. there is data in my source cluster Regards, Nitin Kumar Sharma. On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com wrote: Do you have data sending to *testtopic? *By default mirror maker only consumes data being sent after it taps into the topic. you need to keep sending data to the topic after mirror maker connection is established. If you want to change the behavior you can set auto.offset.reset=smallest so that any new mirror maker coming to the topic will start from the smallest offset On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Xiao I tried MirrorMaker option in my test environment but failed. I am not able to see the log getting copied to destination cluster. I see in the log of MirrorMaker process that connection is successfully established between source and destination cluster but still not sure what i causing the problem Env. Setup == I). Source Cluster (Qenv02) -- i have 2 broker(Qenv02kf01,Qenv02kf02) and 3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03). Destination Clustern (Qenv05) -- i have 2 broker (Qenv05kf01,Qenv05kf02) and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03). II). i have kept consumer and producer properties file in one of the source kafka broker config folder. III).i have executed following command from the same kafka broker to start the process.. log are attached : /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker -consumer.config /app/kafka/config/consumer1.properties --num.streams=2 --producer.config /app/kafka/config/producer1.properties --whitelist testtopic IV). I tried Consumer offset tracker tool also, while Mirror Maker running . I tried by launching second session of same broker where mirror maker is running. I got error message that *NoNode for /consumers/KafkaMaker/offsets/testtopic/0* .Complete log attached. Regards, Nitin Kumar Sharma. On Thu, Mar 26, 2015 at 11:24 AM, tao xiao xiaotao...@gmail.com wrote: Both consumer-1 and consumer-2 are properties of source clusters mirror maker transfers data from. Mirror maker is designed to be able to consume data from N sources (N = 1) and transfer data to one destination cluster. You are free to supply as many consumer properties as you want to instruct mirror maker where to consumer data from. On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Mayuresh and Jiangjie for your response. I have actually not understood Mirror maker clearly and hence bit skeptical if i will be able to execute it effectively. Online i hv seen the following command to be execute, but not understood what is consumer-1 -2.properties here? do i need to copy from my consumer code? also, any reason why
Re: Kafka server relocation
Can you attach your mirror maker log? On Wed, Apr 1, 2015 at 12:28 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: i tried with auto.offset.reset=smallest, but still not working.. there is data in my source cluster Regards, Nitin Kumar Sharma. On Mon, Mar 30, 2015 at 10:30 PM, tao xiao xiaotao...@gmail.com wrote: Do you have data sending to *testtopic? *By default mirror maker only consumes data being sent after it taps into the topic. you need to keep sending data to the topic after mirror maker connection is established. If you want to change the behavior you can set auto.offset.reset=smallest so that any new mirror maker coming to the topic will start from the smallest offset On Tue, Mar 31, 2015 at 3:53 AM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Xiao I tried MirrorMaker option in my test environment but failed. I am not able to see the log getting copied to destination cluster. I see in the log of MirrorMaker process that connection is successfully established between source and destination cluster but still not sure what i causing the problem Env. Setup == I). Source Cluster (Qenv02) -- i have 2 broker(Qenv02kf01,Qenv02kf02) and 3 zk(Qenv02zk01,Qenv02zk02 and Qenv02zk03). Destination Clustern (Qenv05) -- i have 2 broker (Qenv05kf01,Qenv05kf02) and 3 zk(Qenv05zk01,Qenv05zk02 and Qenv05zk03). II). i have kept consumer and producer properties file in one of the source kafka broker config folder. III).i have executed following command from the same kafka broker to start the process.. log are attached : /app/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker -consumer.config /app/kafka/config/consumer1.properties --num.streams=2 --producer.config /app/kafka/config/producer1.properties --whitelist testtopic IV). I tried Consumer offset tracker tool also, while Mirror Maker running . I tried by launching second session of same broker where mirror maker is running. I got error message that *NoNode for /consumers/KafkaMaker/offsets/testtopic/0* .Complete log attached. Regards, Nitin Kumar Sharma. On Thu, Mar 26, 2015 at 11:24 AM, tao xiao xiaotao...@gmail.com wrote: Both consumer-1 and consumer-2 are properties of source clusters mirror maker transfers data from. Mirror maker is designed to be able to consume data from N sources (N = 1) and transfer data to one destination cluster. You are free to supply as many consumer properties as you want to instruct mirror maker where to consumer data from. On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Mayuresh and Jiangjie for your response. I have actually not understood Mirror maker clearly and hence bit skeptical if i will be able to execute it effectively. Online i hv seen the following command to be execute, but not understood what is consumer-1 -2.properties here? do i need to copy from my consumer code? also, any reason why to provide consumer property? bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic Regards, Nitin Kumar Sharma. On Wed, Mar 25, 2015 at 8:57 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: You can use the Mirror maker to move data from one data center to other and once all the data has been moved you can shut down the source data center by doing a controlled shutdown. Thanks, Mayuresh On Wed, Mar 25, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If you want to do a seamless migration. I think a better way is to build a cross datacenter Kafka cluster temporarily. So the process is: 1. Add several new Kafka brokers in your new datacenter and add them to the old cluster. 2. Use replica assignment tool to reassign all the partitions to brokers in new datacenter. 3. Perform controlled shutdown on the brokers in old datacenter. Jiangjie (Becket) Qin On 3/25/15, 2:01 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: Hi Team, in my project, we have built a new datacenter for Kafka brokers and wants to migrate from current datacenter to new one. Switching producers and consumers wont be a problem provided New Datacenter has all the messages of existing Datacenter. i have only 1 topic with 2 partition that need to be migrated... that too it is only 1 time activity. Kindly suggest the best way to deal with this situation. Regards, Nitin Kumar
Re: Kafka server relocation
Both consumer-1 and consumer-2 are properties of source clusters mirror maker transfers data from. Mirror maker is designed to be able to consume data from N sources (N = 1) and transfer data to one destination cluster. You are free to supply as many consumer properties as you want to instruct mirror maker where to consumer data from. On Thu, Mar 26, 2015 at 9:50 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: thanks Mayuresh and Jiangjie for your response. I have actually not understood Mirror maker clearly and hence bit skeptical if i will be able to execute it effectively. Online i hv seen the following command to be execute, but not understood what is consumer-1 -2.properties here? do i need to copy from my consumer code? also, any reason why to provide consumer property? bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic Regards, Nitin Kumar Sharma. On Wed, Mar 25, 2015 at 8:57 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: You can use the Mirror maker to move data from one data center to other and once all the data has been moved you can shut down the source data center by doing a controlled shutdown. Thanks, Mayuresh On Wed, Mar 25, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If you want to do a seamless migration. I think a better way is to build a cross datacenter Kafka cluster temporarily. So the process is: 1. Add several new Kafka brokers in your new datacenter and add them to the old cluster. 2. Use replica assignment tool to reassign all the partitions to brokers in new datacenter. 3. Perform controlled shutdown on the brokers in old datacenter. Jiangjie (Becket) Qin On 3/25/15, 2:01 PM, nitin sharma kumarsharma.ni...@gmail.com wrote: Hi Team, in my project, we have built a new datacenter for Kafka brokers and wants to migrate from current datacenter to new one. Switching producers and consumers wont be a problem provided New Datacenter has all the messages of existing Datacenter. i have only 1 topic with 2 partition that need to be migrated... that too it is only 1 time activity. Kindly suggest the best way to deal with this situation. Regards, Nitin Kumar Sharma. -- -Regards, Mayuresh R. Gharat (862) 250-7125 -- Regards, Tao
Re: lost messages -?
You can use kafka-console-consumer consuming the topic from the beginning *kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning* On Thu, Mar 26, 2015 at 12:17 AM, Victor L vlyamt...@gmail.com wrote: Can someone let me know how to dump contents of topics? I have producers sending messages to 3 brokers but about half of them don't seem to be consumed. I suppose they are getting stuck in queues but how can i figure out where? Thks, -- Regards, Tao
Re: Mirror maker fetcher thread unexpectedly stopped
Hi community, I wanted to know if the solution I supplied can fix the IllegalMonitorStateException issue. Our work is pending on this and we'd like to proceed ASAP. Sorry for bothering. On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote: I think I worked out the answer to question 1. java.lang.IllegalMonitorStateException was thrown due to no ownership of ReentrantLock when trying to call await() on the lock condition. Here is the code snippet from the AbstractFetcherThread.scala in trunk partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } as shown above partitionMapLock is not acquired before calling partitionMapCond.await we can fix this by explicitly calling partitionMapLock.lock(). below code block should work inLock(partitionMapLock) { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error on server side but I have a couple of questions related to mirror maker itself 1. What is root cause of java.lang.IllegalMonitorStateException? As shown in the AbstractFetcherThread source the fetcher thread should catch the java.io.EOFException thrown from underlying simplyConsumer and sleep a while before next run. 2. Mirror maker is unaware of the termination of fetcher thread. That makes it unable to detect the failure and trigger rebalancing. I have 3 mirror maker instances running in 3 different machines listening to the same topic. I would expect the mirror maker will release the partition ownership when underlying fetcher thread terminates so that rebalancing can be triggered.but in fact this is not the case. is this expected behavior or do I miss configure anything? I am running the trunk version as of commit 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: Mirror maker fetcher thread unexpectedly stopped
Thank you for the explanation. Patch submitted https://issues.apache.org/jira/browse/KAFKA-2048 On Wed, Mar 25, 2015 at 8:29 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It should be another ticket. This is a AbstractFetcherThread issue rather than a mirror maker issue. I kind of think this case you saw was a special case as itÂčs not actually a runtime error but a coding bug. Fetcher thread should not die by design. So I donÂčt think we have a way to restart fetchers without code change if they die accidentally. One way to do this is to add liveness check in LeaderFinderThread. But I donÂčt know if this is a necessary change just because of the case you saw. Jiangjie (Becket) Qin On 3/24/15, 5:05 PM, tao xiao xiaotao...@gmail.com wrote: The other question I have is the fact that consumer client is unaware of the health status of underlying fetcher thread. If the fetcher thread dies like the case I encountered is there a way that consumer can restart the fetcher thread or release ownership of partitions so that other consumers can pick them up while fetcher thread is down. On Wed, Mar 25, 2015 at 8:00 AM, tao xiao xiaotao...@gmail.com wrote: Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Xiao, I think the fix for IllegalStateExcepetion is correct. Can you also create a ticket and submit a patch? Thanks. Jiangjie (Becket) Qin On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote: Hi community, I wanted to know if the solution I supplied can fix the IllegalMonitorStateException issue. Our work is pending on this and we'd like to proceed ASAP. Sorry for bothering. On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote: I think I worked out the answer to question 1. java.lang.IllegalMonitorStateException was thrown due to no ownership of ReentrantLock when trying to call await() on the lock condition. Here is the code snippet from the AbstractFetcherThread.scala in trunk partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } as shown above partitionMapLock is not acquired before calling partitionMapCond.await we can fix this by explicitly calling partitionMapLock.lock(). below code block should work inLock(partitionMapLock) { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLoc k.j ava:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(Abstrac tQu euedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Ab str actQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject .aw ait(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetche rTh read.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala :90 ) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error
Re: Mirror maker fetcher thread unexpectedly stopped
Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Xiao, I think the fix for IllegalStateExcepetion is correct. Can you also create a ticket and submit a patch? Thanks. Jiangjie (Becket) Qin On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote: Hi community, I wanted to know if the solution I supplied can fix the IllegalMonitorStateException issue. Our work is pending on this and we'd like to proceed ASAP. Sorry for bothering. On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote: I think I worked out the answer to question 1. java.lang.IllegalMonitorStateException was thrown due to no ownership of ReentrantLock when trying to call await() on the lock condition. Here is the code snippet from the AbstractFetcherThread.scala in trunk partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } as shown above partitionMapLock is not acquired before calling partitionMapCond.await we can fix this by explicitly calling partitionMapLock.lock(). below code block should work inLock(partitionMapLock) { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j ava:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu euedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr actQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw ait(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh read.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90 ) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error on server side but I have a couple of questions related to mirror maker itself 1. What is root cause of java.lang.IllegalMonitorStateException? As shown in the AbstractFetcherThread source the fetcher thread should catch the java.io.EOFException thrown from underlying simplyConsumer and sleep a while before next run. 2. Mirror maker is unaware of the termination of fetcher thread. That makes it unable to detect the failure and trigger rebalancing. I have 3 mirror maker instances running in 3 different machines listening to the same topic. I would expect the mirror maker will release the partition ownership when underlying fetcher thread terminates so that rebalancing can be triggered.but in fact this is not the case. is this expected behavior or do I miss configure anything? I am running the trunk version as of commit 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d -- Regards, Tao -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: Mirror maker fetcher thread unexpectedly stopped
The other question I have is the fact that consumer client is unaware of the health status of underlying fetcher thread. If the fetcher thread dies like the case I encountered is there a way that consumer can restart the fetcher thread or release ownership of partitions so that other consumers can pick them up while fetcher thread is down. On Wed, Mar 25, 2015 at 8:00 AM, tao xiao xiaotao...@gmail.com wrote: Thanks JIanjie. Can I reuse KAFKA-1997 or should I create a new ticket? On Wed, Mar 25, 2015 at 7:58 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Xiao, I think the fix for IllegalStateExcepetion is correct. Can you also create a ticket and submit a patch? Thanks. Jiangjie (Becket) Qin On 3/24/15, 4:31 PM, tao xiao xiaotao...@gmail.com wrote: Hi community, I wanted to know if the solution I supplied can fix the IllegalMonitorStateException issue. Our work is pending on this and we'd like to proceed ASAP. Sorry for bothering. On Mon, Mar 23, 2015 at 4:32 PM, tao xiao xiaotao...@gmail.com wrote: I think I worked out the answer to question 1. java.lang.IllegalMonitorStateException was thrown due to no ownership of ReentrantLock when trying to call await() on the lock condition. Here is the code snippet from the AbstractFetcherThread.scala in trunk partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } as shown above partitionMapLock is not acquired before calling partitionMapCond.await we can fix this by explicitly calling partitionMapLock.lock(). below code block should work inLock(partitionMapLock) { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.j ava:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQu euedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstr actQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.aw ait(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherTh read.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90 ) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error on server side but I have a couple of questions related to mirror maker itself 1. What is root cause of java.lang.IllegalMonitorStateException? As shown in the AbstractFetcherThread source the fetcher thread should catch the java.io.EOFException thrown from underlying simplyConsumer and sleep a while before next run. 2. Mirror maker is unaware of the termination of fetcher thread. That makes it unable to detect the failure and trigger rebalancing. I have 3 mirror maker instances running in 3 different machines listening to the same topic. I would expect the mirror maker will release the partition ownership when underlying fetcher thread terminates so that rebalancing can be triggered.but in fact this is not the case. is this expected behavior or do I miss configure anything? I am running the trunk version as of commit 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d -- Regards
Re: Mirror maker fetcher thread unexpectedly stopped
I think I worked out the answer to question 1. java.lang.IllegalMonitorStateException was thrown due to no ownership of ReentrantLock when trying to call await() on the lock condition. Here is the code snippet from the AbstractFetcherThread.scala in trunk partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } as shown above partitionMapLock is not acquired before calling partitionMapCond.await we can fix this by explicitly calling partitionMapLock.lock(). below code block should work inLock(partitionMapLock) { partitionsWithError ++= partitionMap.keys // there is an error occurred while fetching partitions, sleep a while partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } On Mon, Mar 23, 2015 at 1:50 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error on server side but I have a couple of questions related to mirror maker itself 1. What is root cause of java.lang.IllegalMonitorStateException? As shown in the AbstractFetcherThread source the fetcher thread should catch the java.io.EOFException thrown from underlying simplyConsumer and sleep a while before next run. 2. Mirror maker is unaware of the termination of fetcher thread. That makes it unable to detect the failure and trigger rebalancing. I have 3 mirror maker instances running in 3 different machines listening to the same topic. I would expect the mirror maker will release the partition ownership when underlying fetcher thread terminates so that rebalancing can be triggered.but in fact this is not the case. is this expected behavior or do I miss configure anything? I am running the trunk version as of commit 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d -- Regards, Tao -- Regards, Tao
Re: kafka audit
Linkedin has an excellent tool that monitors lag/data loss/data duplication and etc. Here is the reference http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015 it is not open sourced though. On Mon, Mar 23, 2015 at 3:26 PM, sunil kalva kalva.ka...@gmail.com wrote: Hi What is best practice for adding audit feature in kafka, Is there any framework available for enabling audit feature at producer and consumer level and any UI frameworks for monitoring. tx SunilKalva -- Regards, Tao
Mirror maker fetcher thread unexpectedly stopped
Hi, I was running a mirror maker and got java.lang.IllegalMonitorStateException that caused the underlying fetcher thread completely stopped. Here is the log from mirror maker. [2015-03-21 02:11:53,069] INFO Reconnect due to socket error: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer) [2015-03-21 02:11:53,081] WARN [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 2398588; ClientId: phx-slc-mm-user-3; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test.topic,0] - PartitionFetchInfo(3766065,1048576). Possible cause: java.nio.channels.ClosedChannelException (kafka.consumer.ConsumerFetcherThread) [2015-03-21 02:11:53,083] ERROR [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Error due to (kafka.consumer.ConsumerFetcherThread) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1723) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2166) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:106) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:90) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-03-21 02:11:53,083] INFO [ConsumerFetcherThread-localhost-1426853968478-4ebaa354-0-6], Stopped (kafka.consumer.ConsumerFetcherThread) I am still investigating what caused the connection error on server side but I have a couple of questions related to mirror maker itself 1. What is root cause of java.lang.IllegalMonitorStateException? As shown in the AbstractFetcherThread source the fetcher thread should catch the java.io.EOFException thrown from underlying simplyConsumer and sleep a while before next run. 2. Mirror maker is unaware of the termination of fetcher thread. That makes it unable to detect the failure and trigger rebalancing. I have 3 mirror maker instances running in 3 different machines listening to the same topic. I would expect the mirror maker will release the partition ownership when underlying fetcher thread terminates so that rebalancing can be triggered.but in fact this is not the case. is this expected behavior or do I miss configure anything? I am running the trunk version as of commit 82789e75199fdc1cae115c5c2eadfd0f1ece4d0d -- Regards, Tao
Re: New Java Producer Client handling case where Kafka is unreachable
You can set producer property retries not equal to 0. Details can be found here http://kafka.apache.org/documentation.html#newproducerconfigs On Fri, Mar 20, 2015 at 3:01 PM, Samuel Chase samebch...@gmail.com wrote: Hello Everyone, In the the new Java Producer API, the Callback code in KafkaProducer.send is run after there is a response from the Kafka server. This can be used if some error handling needs to be done based on the response. When using the new Java Kafka Producer, I've noticed that when the Kafka server is down/unreachable, KafkaProducer.send blocks until the Kafka server is back up again. We've been using the older Scala Producer and when Kafka is unreachable it throws an exception after a few retries. This exception is caught and then some error handling code is run. - What is the recommended way of using the new Java Producer API to handle the case where Kafka is unreachable temporarily? I don't want to wait until it is reachable again before I know that the send failed. Any help, advice shall be much appreciated. Thanks, Samuel -- Regards, Tao
Re: Post on running Kafka at LinkedIn
here is the slide http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015 On Sat, Mar 21, 2015 at 2:36 AM, Xiao lixiao1...@gmail.com wrote: Hi, James, Thank you for sharing it! The links of videos and slides are the same. Could you check the link of slides? Xiao Li On Mar 20, 2015, at 11:30 AM, James Cheng jch...@tivo.com wrote: For those who missed it: The Kafka Audit tool was also presented at the 1/27 Kafka meetup: http://www.meetup.com/http-kafka-apache-org/events/219626780/ Recorded video is here, starting around the 40 minute mark: http://www.ustream.tv/recorded/58109076 Slides are here: http://www.ustream.tv/recorded/58109076 -James On Mar 20, 2015, at 9:47 AM, Todd Palino tpal...@gmail.com wrote: For those who are interested in detail on how we've got Kafka set up at LinkedIn, I have just published a new posted to our Engineering blog titled Running Kafka at Scale https://engineering.linkedin.com/kafka/running-kafka-scale It's a general overview of our current Kafka install, tiered architecture, audit, and the libraries we use for producers and consumers. You'll also be seeing more posts from the SRE team here in the coming weeks on deeper looks into both Kafka and Samza. Additionally, I'll be giving a talk at ApacheCon next month on running tiered Kafka architectures. If you're in Austin for that, please come by and check it out. -Todd -- Regards, Tao
No topic owner when using different assignment strategies
Hi team, I have two consumer instances with the same group id connecting to two different topics with 1 partition created for each. One consumer uses partition.assignment.strategy=roundrobin and the other one uses default assignment strategy. Both consumers have 1 thread spawned internally and connect kafka using createMessageStreamsByFilter. The consumer with roundrobin assignment strategy connected kafka first and had 2 topics assigned to itself and then I brought up another consumer that has default assignment strategy configured. I saw rebalancing happened in both consumers but at the end only one of the topics was assigned to the consumer that is configured roundrobin assignment strategy and no topics were assigned to the other consumer. This led to one topic missing its owner. Here is the result from zk [zk: localhost:2181(CONNECTED) 0] get /consumers/test/owners/mm-benchmark-test/0 Node does not exist: /consumers/test12345667f/owners/mm-benchmark-test/0 [zk: localhost:2181(CONNECTED) 1] get /consumers/test/owners/mm-benchmark-test1/0 test-localhost-1426605370072-904d6fba-0 The kafka version I use is 0.8.2.1 -- Regards, Tao
Re: No topic owner when using different assignment strategies
This is the corrected zk result Here is the result from zk [zk: localhost:2181(CONNECTED) 0] get /consumers/test/owners/mm-benchmark-test/0 Node does not exist: /consumers/test/owners/mm-benchmark-test/0 [zk: localhost:2181(CONNECTED) 1] get /consumers/test/owners/mm-benchmark-test1/0 test-localhost-1426605370072-904d6fba-0 On Tue, Mar 17, 2015 at 11:30 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have two consumer instances with the same group id connecting to two different topics with 1 partition created for each. One consumer uses partition.assignment.strategy=roundrobin and the other one uses default assignment strategy. Both consumers have 1 thread spawned internally and connect kafka using createMessageStreamsByFilter. The consumer with roundrobin assignment strategy connected kafka first and had 2 topics assigned to itself and then I brought up another consumer that has default assignment strategy configured. I saw rebalancing happened in both consumers but at the end only one of the topics was assigned to the consumer that is configured roundrobin assignment strategy and no topics were assigned to the other consumer. This led to one topic missing its owner. Here is the result from zk [zk: localhost:2181(CONNECTED) 0] get /consumers/test/owners/mm-benchmark-test/0 Node does not exist: /consumers/test12345667f/owners/mm-benchmark-test/0 [zk: localhost:2181(CONNECTED) 1] get /consumers/test/owners/mm-benchmark-test1/0 test-localhost-1426605370072-904d6fba-0 The kafka version I use is 0.8.2.1 -- Regards, Tao -- Regards, Tao
Re: createMessageStreams vs createMessageStreamsByFilter
The number of fetchers is configurable via num.replica.fetchers. The description of num.replica.fetchers in Kafka documentation is not quite accurate. num.replica.fetchers actually controls the max number of fetchers per broker. In you case num.replica.fetchers=8 and 5 brokers the means no more 8 fetchers created for each broker On Fri, Mar 13, 2015 at 1:21 PM, Zakee kzak...@netzero.net wrote: Is this always the case that there is only one fetcher per broker, wonât setting num.replica.fetchers greater than number-of-brokers cause more fetchers per broker? Letâs I have 5 brokers, and num of replica fetchers is 8, will there be 2 fetcher threads pulling from each broker? Thanks Zakee On Mar 12, 2015, at 11:15 AM, James Cheng jch...@tivo.com wrote: Ah, I understand now. I didn't realize that there was one fetcher thread per broker. Thanks Tao Guozhang! -James On Mar 11, 2015, at 5:00 PM, tao xiao xiaotao...@gmail.com mailto: xiaotao...@gmail.com wrote: Fetcher thread is per broker basis, it ensures that at lease one fetcher thread per broker. Fetcher thread is sent to broker with a fetch request to ask for all partitions. So if A, B, C are in the same broker fetcher thread is still able to fetch data from A, B, C even though A returns no data. same logic is applied to different broker. On Thu, Mar 12, 2015 at 6:25 AM, James Cheng jch...@tivo.com wrote: On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi James, What I meant before is that a single fetcher may be responsible for putting fetched data to multiple queues according to the construction of the streams setup, where each queue may be consumed by a different thread. And the queues are actually bounded. Now say if there are two queues that are getting data from the same fetcher F, and are consumed by two different user threads A and B. If thread A for some reason got slowed / hung consuming data from queue 1, then queue 1 will eventually get full, and F trying to put more data to it will be blocked. Since F is parked on trying to put data to queue 1, queue 2 will not get more data from it, and thread B may hence gets starved. Does that make sense now? Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data. What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C? Thanks, -James Guozhang On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer
Re: Does consumer support combination of whitelist and blacklist topic filtering
something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao
Re: Does consumer support combination of whitelist and blacklist topic filtering
A little more context about my needs: I have a requirement that I need to start/stop a topic at runtime based on a event sent to MM. at the moment I need to bounce the MM and find a way to exclude the topic from whitelist which is not an easy job with regex. If I can pass in a combination of blacklist and whitelist I can easily achieve this by having something like --whitelist topic.* --blacklist topic.1 On Thu, Mar 12, 2015 at 9:10 PM, tao xiao xiaotao...@gmail.com wrote: something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- Regards, Tao
Re: Does consumer support combination of whitelist and blacklist topic filtering
Thank you Guozhang for your advice. A dynamic topic filter is what I need so that I can stop a topic consumption when I need to at runtime. On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com wrote: 1. Dynamic: yeah that is sth. we could think of, this could be useful operationally. 2. Regex: I think in terms of expressiveness it should be sufficient for almost all subset of topics. In practice usually the rule of thumb is that you will create your topics that belongs to the same group with some prefix / suffix so that regex expression would not be crazily long. Guozhang On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com wrote: something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang
Re: Does consumer support combination of whitelist and blacklist topic filtering
Yes, that will work. message handle can filter out message sent from certain topics On Fri, Mar 13, 2015 at 6:30 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: No sure if it is an option. But does filtering out topics with message handler works for you? Are you going to resume consuming from a topic after you stop consuming from it? Jiangjie (Becket) Qin On 3/12/15, 8:05 AM, tao xiao xiaotao...@gmail.com wrote: Yes, you are right. a dynamic topicfilter is more appropriate where I can filter topics at runtime via some kind of interface e.g. JMX On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com wrote: Tao, Based on your description I think the combination of whitelist / blacklist will not achieve your goal, since it is still static. Guozhang On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your advice. A dynamic topic filter is what I need so that I can stop a topic consumption when I need to at runtime. On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com wrote: 1. Dynamic: yeah that is sth. we could think of, this could be useful operationally. 2. Regex: I think in terms of expressiveness it should be sufficient for almost all subset of topics. In practice usually the rule of thumb is that you will create your topics that belongs to the same group with some prefix / suffix so that regex expression would not be crazily long. Guozhang On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com wrote: something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I
Re: Does consumer support combination of whitelist and blacklist topic filtering
I am not sure how MM is going to be rewritten. Based on the current implementation in trunk offset is not committed unless it is produced to destination. With assumption that this logic remains MM will not acknowledge the offset back to source for filtered message. So I think it is safe to filter messages out while keeping committed offset unchanged for that particular topic. Please correct me if I am wrong On Fri, Mar 13, 2015 at 1:12 PM, Guozhang Wang wangg...@gmail.com wrote: Note that with filtering in message handler, records from the source cluster are still considered as consumed since the offsets will be committed. If you change the filtering dynamically back to whilelist these topics, you will lose the data that gets consumed during the period of the blacklist. Guozhang On Thu, Mar 12, 2015 at 10:01 PM, tao xiao xiaotao...@gmail.com wrote: Yes, that will work. message handle can filter out message sent from certain topics On Fri, Mar 13, 2015 at 6:30 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: No sure if it is an option. But does filtering out topics with message handler works for you? Are you going to resume consuming from a topic after you stop consuming from it? Jiangjie (Becket) Qin On 3/12/15, 8:05 AM, tao xiao xiaotao...@gmail.com wrote: Yes, you are right. a dynamic topicfilter is more appropriate where I can filter topics at runtime via some kind of interface e.g. JMX On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com wrote: Tao, Based on your description I think the combination of whitelist / blacklist will not achieve your goal, since it is still static. Guozhang On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your advice. A dynamic topic filter is what I need so that I can stop a topic consumption when I need to at runtime. On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com wrote: 1. Dynamic: yeah that is sth. we could think of, this could be useful operationally. 2. Regex: I think in terms of expressiveness it should be sufficient for almost all subset of topics. In practice usually the rule of thumb is that you will create your topics that belongs to the same group with some prefix / suffix so that regex expression would not be crazily long. Guozhang On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com wrote: something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao
Re: Does consumer support combination of whitelist and blacklist topic filtering
Yes, you are right. a dynamic topicfilter is more appropriate where I can filter topics at runtime via some kind of interface e.g. JMX On Thu, Mar 12, 2015 at 11:03 PM, Guozhang Wang wangg...@gmail.com wrote: Tao, Based on your description I think the combination of whitelist / blacklist will not achieve your goal, since it is still static. Guozhang On Thu, Mar 12, 2015 at 6:30 AM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your advice. A dynamic topic filter is what I need so that I can stop a topic consumption when I need to at runtime. On Thu, Mar 12, 2015 at 9:21 PM, Guozhang Wang wangg...@gmail.com wrote: 1. Dynamic: yeah that is sth. we could think of, this could be useful operationally. 2. Regex: I think in terms of expressiveness it should be sufficient for almost all subset of topics. In practice usually the rule of thumb is that you will create your topics that belongs to the same group with some prefix / suffix so that regex expression would not be crazily long. Guozhang On Thu, Mar 12, 2015 at 6:10 AM, tao xiao xiaotao...@gmail.com wrote: something like dynamic filtering that can be updated at runtime or deny all but allow a certain set of topics that cannot be specified easily by regex On Thu, Mar 12, 2015 at 9:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hmm, what kind of customized filtering do you have in mind? I thought with --whitelist you could already specify regex to do filtering. On Thu, Mar 12, 2015 at 5:56 AM, tao xiao xiaotao...@gmail.com wrote: Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic
Re: Does consumer support combination of whitelist and blacklist topic filtering
Hi Guozhang, I was meant to be topicfilter not topic-count. sorry for the confusion. What I want to achieve is to pass my own customized topicfilter to MM so that I can filter out topics what ever I like. I know MM doesn't support this now. I am just thinking if this is a good feature to add in On Thu, Mar 12, 2015 at 8:24 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Sorry I was mistaken before, yes in MM you can only directly specify --whitelist, --blacklist, and the number of streams you want to create via --num.streams, but cannot set specific topic-count. This is because MM is mainly used for cross DC replication, and hence usually will pipe all topics or a majority of the topics from the source cluster to the destination, hence usually you do not care about some topics should get X streams, while some other topics should get Y streams. Guozhang On Wed, Mar 11, 2015 at 11:59 PM, tao xiao xiaotao...@gmail.com wrote: The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao
Re: Does consumer support combination of whitelist and blacklist topic filtering
The topic list is not specified in consumer.properties and I don't think there is any property in consumer config that allows us to specify what topics we want to consume. Can you point me to the property if there is any? On Thu, Mar 12, 2015 at 12:14 AM, Guozhang Wang wangg...@gmail.com wrote: Tao, In MM people can pass in consumer configs, in which people can specify consumption topics, either in regular topic list format or whitelist / blacklist. So I think it already does what you need? Guozhang On Tue, Mar 10, 2015 at 10:09 PM, tao xiao xiaotao...@gmail.com wrote: Thank you guys for answering. I think it will be good that we can pass in a customised topicCount ( I think this is the interface whitelist and backlist implement if I am not mistaken) to MM to achieve similar thing On Wednesday, March 11, 2015, Guozhang Wang wangg...@gmail.com wrote: Hi Tao, Unfortunately MM does not support whitelist / blacklist at the same time, and you have to choose either one upon initialization. As for your case, I think it can be captured by some reg-ex to exclude nothing else but 10, but I do not know the exact expression. Guozhang On Tue, Mar 10, 2015 at 7:58 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com javascript:; wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao -- -- Guozhang -- Regards, Tao -- -- Guozhang -- Regards, Tao
Re: Out of Disk Space - Infinite loop
Did you stop mirror maker? On Thu, Mar 12, 2015 at 8:27 AM, Saladi Naidu naidusp2...@yahoo.com.invalid wrote: We have 3 DC's and created 5 node Kafka cluster in each DC, connected these 3 DC's using Mirror Maker for replication. We were conducting performance testing using Kafka Producer Performance tool to load 100 million rows into 7 topics. We expected that data will be loaded evenly across 7 topics but 4 topics got loaded with ~2 million messages and remaining 3 topics loaded with 90 million messages. The nodes that were leaders of those 3 topics ran out of disk space and nodes went down. We tried to bring back these 2 nodes by doing following 1. Stopped Kafka Service 2. Deleted couple of topics that were taking up too much space i.e. /var/kafka/logs/{topic$}/ and file system showed 47% available 3. Brought back the Kafka nodes As soon as nodes are back, we started observing the file system growing and in 15 minutes the mount point became full again. Deleted topics got recreated and taking up space again. Looking at kafka.log, it shows many of the following messages. Ultimately the node goes down. We don't need to recover data now, we would like to bring nodes back. What are the steps to bring back these nodes? [2015-03-11 20:52:36,323] INFO Rolled new log segment for 'dc2-perf-topic5-0' in 3 ms. (kafka.log.Log) [2015-03-11 15:58:07,321] INFO [Kafka Server 1021124614], started (kafka.server.KafkaServer) [2015-03-11 15:58:07,882] INFO Completed load of log dc2-perf-topic5-0 with log end offset 0 (kafka.log.Log) [2015-03-11 15:58:07,900] INFO Created log for partition [dc2-perf-topic5,0] in /var/kafka/log with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 360, index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy - delete, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 60480}. (kafka.log.LogManager) [2015-03-11 15:58:07,914] INFO Completed load of log dc2-perf-topic2-0 with log end offset 0 (kafka.log.Log) [2015-03-11 15:58:07,916] INFO Created log for partition [dc2-perf-topic2,0] in /var/kafka/log with properties {segment.index.bytes - 10485760, file.delete.delay.ms - 6, segment.bytes - 1073741824, flush.ms - 9223372036854775807, delete.retention.ms - 360, index.interval.bytes - 4096, retention.bytes - -1, cleanup.policy - delete, segment.ms - 60480, max.message.bytes - 112, flush.messages - 9223372036854775807, min.cleanable.dirty.ratio - 0.5, retention.ms - 60480}. (kafka.log.LogManager) [2015-03-11 15:58:07,935] INFO Completed load of log dc2-perf-topic9-0 with log end offset 0 (kafka.log.Log) SP Naidu -- Regards, Tao
Re: createMessageStreams vs createMessageStreamsByFilter
Fetcher thread is per broker basis, it ensures that at lease one fetcher thread per broker. Fetcher thread is sent to broker with a fetch request to ask for all partitions. So if A, B, C are in the same broker fetcher thread is still able to fetch data from A, B, C even though A returns no data. same logic is applied to different broker. On Thu, Mar 12, 2015 at 6:25 AM, James Cheng jch...@tivo.com wrote: On Mar 11, 2015, at 9:12 AM, Guozhang Wang wangg...@gmail.com wrote: Hi James, What I meant before is that a single fetcher may be responsible for putting fetched data to multiple queues according to the construction of the streams setup, where each queue may be consumed by a different thread. And the queues are actually bounded. Now say if there are two queues that are getting data from the same fetcher F, and are consumed by two different user threads A and B. If thread A for some reason got slowed / hung consuming data from queue 1, then queue 1 will eventually get full, and F trying to put more data to it will be blocked. Since F is parked on trying to put data to queue 1, queue 2 will not get more data from it, and thread B may hence gets starved. Does that make sense now? Yes, that makes sense. That is the scenario where one thread of a consumer can cause a backup in the queue, which would cause other threads to not receive data. What about the situation I described, where a thread consumes a queue that is supposed to be filled with messages from multiple partitions? If partition A has no messages and partitions B and C do, how will the fetcher behave? Will the processing thread receive messages from partitions B and C? Thanks, -James Guozhang On Tue, Mar 10, 2015 at 5:15 PM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all
Re: createMessageStreams vs createMessageStreamsByFilter
consumer.timeout.ms only affects how the stream reads data from the internal chunk queue that is used to buffer received data. The actual data fetching is done by another fetcher thread kafka.consumer.ConsumerFetcherThread. The fetcher thread keeps reading data from broker and put them to the queue and the stream keeps polling the queue and passes data back to consumer if any. So for the case like createMessageStreams(AC = 1) the same stream ( which means the same chunk queue) is shared by multiple partitions of topic AC. If one of the partition has no data the consumer is still able to read data from other partitions as the fetcher thread keeps feeding data from other partitions to the queue. The only situation where consumer will get stuck is when fetcher thread is blocked by network like high network latency between consumer and broker or no data from broker. This is because fetch thread is implemented using block I/O On Wed, Mar 11, 2015 at 8:15 AM, James Cheng jch...@tivo.com wrote: Hi, Sorry to bring up this old thread, but my question is about this exact thing: Guozhang, you said: A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. You said that in the createMessageStreamsByFilter case, if topic AC had no messages in it and consumer.timeout.ms = -1, then the 3 threads might all be blocked waiting for data to arrive from topic AC, and so messages from BC would not be processed. createMessageStreamsByFilter(*C = 1) (single stream) would have the same problem but just worse. Behind the scenes, is there a single thread that is consuming (round-robin?) messages from the different partitions and inserting them all into a single queue for the application code to process? And that is why a single partition with no messages with block the other messages from getting through? What about createMessageStreams(AC = 1)? That creates a single stream that contains messages from multiple partitions, which might be on different brokers. Does that also suffer the same problem, where if one partition has no messages, that the application would not receive messages from the other paritions? Thanks, -James On Feb 11, 2015, at 8:13 AM, Guozhang Wang wangg...@gmail.com wrote: The new consumer will be released in 0.9, which is targeted for end of this quarter. On Tue, Feb 10, 2015 at 7:11 PM, tao xiao xiaotao...@gmail.com wrote: Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote: Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads
Re: How replicas catch up the leader
I ended up running kafka-reassign-partitions.sh to reassign partitions to different nodes On Tue, Mar 10, 2015 at 11:31 AM, sy.pan shengyi@gmail.com wrote: Hi, tao xiao and Jiangjie Qin I encounter with the same issue, my node had recovered from high load problem (caused by other application) this is the kafka-topic show: Topic:ad_click_sts PartitionCount:6ReplicationFactor:2 Configs: Topic: ad_click_sts Partition: 0Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 1Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 2Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 3Leader: 0 Replicas: 0,1 Isr: 0 Topic: ad_click_sts Partition: 4Leader: 1 Replicas: 1,0 Isr: 1 Topic: ad_click_sts Partition: 5Leader: 0 Replicas: 0,1 Isr: 0 ReplicaFetcherThread info extracted from kafka server.log : [2015-03-09 21:06:05,450] ERROR [ReplicaFetcherThread-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 7331; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [ad_click_sts,5] - PartitionFetchInfo(6149699,1048576),[ad_click_sts,3] - PartitionFetchInfo(6147835,1048576),[ad_click_sts,1] - PartitionFetchInfo(6235071,1048576) (kafka.server.ReplicaFetcherThread) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) âŠâŠ.. at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) [2015-03-09 21:06:05,450] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer) [2015-03-09 21:05:57,116] INFO Partition [ad_click_sts,4] on broker 1: Cached zkVersion [556] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2015-03-09 21:06:05,772] INFO Partition [ad_click_sts,2] on broker 1: Shrinking ISR for partition [ad_click_sts,2] from 1,0 to 1 (kafka.cluster.Partition) How to fix this Isr problem ? Is there some command can be run ? Regards sy.pan -- Regards, Tao
Does consumer support combination of whitelist and blacklist topic filtering
Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao
Re: Batching at the socket layer
org.apache.kafka.clients.producer.Producer is the new api producer On Tue, Mar 10, 2015 at 11:22 PM, Corey Nolet cjno...@gmail.com wrote: Thanks Jiangie! So what version is considered the new api? Is that the javaapi in version 0.8.2?. On Mon, Mar 9, 2015 at 2:29 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: The stickiness of partition only applies to old producer. In new producer we have the round robin for each message. The batching in new producer is per topic partition, the batch size it is controlled by both max batch size and linger time config. Jiangjie (Becket) Qin On 3/9/15, 10:10 AM, Corey Nolet cjno...@gmail.com wrote: I'm curious what type of batching Kafka producers do at the socket layer. For instance, if I have a partitioner that round robin's n messages to a different partition, am I guaranteed to get n different messages sent over the socket or is there some micro-batching going on underneath? I am trying to understand the semantics of the default partitioner and why it sticks to partitions for 10 minutes. If I were to lower that interval to 1sec, would I acheive better batching that I would if I was to completely round-robin each message to a different partition? -- Regards, Tao
Re: Does consumer support combination of whitelist and blacklist topic filtering
I actually mean if we can achieve this in mirror maker. On Tue, Mar 10, 2015 at 10:52 PM, tao xiao xiaotao...@gmail.com wrote: Hi, I have an user case where I need to consume a list topics with name that matches pattern topic.* except for one that is topic.10. Is there a way that I can combine the use of whitelist and blacklist so that I can achieve something like accept all topics with regex topic.* but exclude topic.10? -- Regards, Tao -- Regards, Tao
Topics are not evenly distributed to streams using Range partition assignment
Hi, I created a message stream in my consumer using connector .createMessageStreamsByFilter(new Whitelist(mm-benchmark-test\\w*), 5); I have 5 topics in my cluster and each of the topic has only one partition. My understanding of wildcard stream is that multiple streams are shared between selected topics. In my case 5 streams should be shared between 5 different topics. But when I looked at the log it showed a different story 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test2 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test1 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test4 2015-03-09 19:02:36 INFO kafka.utils.Logging$class:68 - [test12345667_LM-SHC-00950667-1425898953590-d99e2d75], test12345667_LM-SHC-00950667-1425898953590-d99e2d75-0 successfully owned partition 0 for topic mm-benchmark-test3 As indicated from the log only one stream was assigned to all topics. I just wanted to know if this is expected behavior? if yes how do we evenly distribute topics across different streams? by using roundrobin assigner? -- Regards, Tao
Re: kafka mirroring ...!
I don't think you can mirror messages to a different topic name in the current mirror maker implementation. Mirror maker sends the message to destination topic based on the topic name it reads from source On Mon, Mar 9, 2015 at 5:00 PM, sunil kalva sambarc...@gmail.com wrote: Can i configure different topic name in destination cluster, i mean can i have different topic names for source and destination cluster for mirroring. If yes how can i map source topic with destination topic name ? SunilKalva On Mon, Mar 9, 2015 at 6:41 AM, tao xiao xiaotao...@gmail.com wrote: Ctrl+c is clean shutdown. kill -9 is not On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote: What does a clean shutdown of the MM entail? So far I've just been using Ctrl + C to send an interrupt to kill it. Alex On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If auto.offset.reset is set to smallest, it does not mean the consumer will always consume from the smallest. It means that if no previous offset commit is found for this consumer group, then it will consume from the smallest. So for mirror maker, you probably want to always use the same consumer group id. This could be configured in the consumer config file you pass into mirror maker. Another thing about duplicate messages is that if mirror maker is shutdown cleanly, next time when you start it again with same consumer group id, there should be no duplicates. But if mirror maker shutdown uncleanly(e.g. By a kill -9), then next time it starts up you might still have duplicate messages after the last committed offsets. Jiangjie (Becket) Qin On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote: Qin Partition problem is solved by passing --new.producer true option in command line, but adding auto.offset.rese=smallest config, every time i restart the Mirror tool it copies from starting ends up having lot of duplicate messages in destination cluster. Could you please tell me how do i configure to make sure that destination cluster is always insync with source cluster. SunilKalva On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: For data not showing up, you need to make sure mirror maker consumer auto.offset.reset is set to smallest, otherwise when you run mirror maker for the first time, all the pre-existing messages wonÂčt be consumed. For partition sticking, can you verify if your messages are keyed messages or not? If they are not keyed messages, can you check if you are using old producer or new producer? For old producer, the default behavior is sticking to one partition for 10 min and then move to the next partition. So if you wait for more than 10 min, you should see messages in two different partitions. Jiangjie (Becket) Qin On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote: And i also observed ,all the data is moving to one partition in destination cluster though i have multiple partitions for that topic in source and destination clusters. SunilKalva On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com wrote: I ran kafka mirroring tool after producing data in source cluster, and this is not copied to destination cluster. If i produce data after running tool those data are copied to destination cluster. Am i missing something ? -- SunilKalva -- SunilKalva -- SunilKalva -- Regards, Tao -- SunilKalva -- Regards, Tao
Re: kafka mirroring ...!
Ctrl+c is clean shutdown. kill -9 is not On Mon, Mar 9, 2015 at 2:32 AM, Alex Melville amelvi...@g.hmc.edu wrote: What does a clean shutdown of the MM entail? So far I've just been using Ctrl + C to send an interrupt to kill it. Alex On Sat, Mar 7, 2015 at 10:59 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: If auto.offset.reset is set to smallest, it does not mean the consumer will always consume from the smallest. It means that if no previous offset commit is found for this consumer group, then it will consume from the smallest. So for mirror maker, you probably want to always use the same consumer group id. This could be configured in the consumer config file you pass into mirror maker. Another thing about duplicate messages is that if mirror maker is shutdown cleanly, next time when you start it again with same consumer group id, there should be no duplicates. But if mirror maker shutdown uncleanly(e.g. By a kill -9), then next time it starts up you might still have duplicate messages after the last committed offsets. Jiangjie (Becket) Qin On 3/7/15, 11:45 PM, sunil kalva sambarc...@gmail.com wrote: Qin Partition problem is solved by passing --new.producer true option in command line, but adding auto.offset.rese=smallest config, every time i restart the Mirror tool it copies from starting ends up having lot of duplicate messages in destination cluster. Could you please tell me how do i configure to make sure that destination cluster is always insync with source cluster. SunilKalva On Sun, Mar 8, 2015 at 12:54 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: For data not showing up, you need to make sure mirror maker consumer auto.offset.reset is set to smallest, otherwise when you run mirror maker for the first time, all the pre-existing messages wonÂčt be consumed. For partition sticking, can you verify if your messages are keyed messages or not? If they are not keyed messages, can you check if you are using old producer or new producer? For old producer, the default behavior is sticking to one partition for 10 min and then move to the next partition. So if you wait for more than 10 min, you should see messages in two different partitions. Jiangjie (Becket) Qin On 3/7/15, 8:28 AM, sunil kalva sambarc...@gmail.com wrote: And i also observed ,all the data is moving to one partition in destination cluster though i have multiple partitions for that topic in source and destination clusters. SunilKalva On Sat, Mar 7, 2015 at 9:54 PM, sunil kalva sambarc...@gmail.com wrote: I ran kafka mirroring tool after producing data in source cluster, and this is not copied to destination cluster. If i produce data after running tool those data are copied to destination cluster. Am i missing something ? -- SunilKalva -- SunilKalva -- SunilKalva -- Regards, Tao
Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
Actually I was going to report another bug that was exactly caused by UncheckedOffsets.removeOffset issue (remove offsets before it is added) As the current project I am working on heavily relies on the functionalities MM offers it would be good that if you put the fix to trunk or gives me some advices how to fix the synchronization issue. BTW can the synchronization issue be fixed by adding the unackedoffset to the offset list before calling producer.send ? On Sat, Mar 7, 2015 at 4:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Tao, Thanks a lot for finding the bug. We are actually rewriting the mirror maker in KAFKA-1997 with a much simplified solution using the newly added flush() call in new java producer. Mirror maker in current trunk is also missing one necessary synchronization - the UncheckedOffsets.removeOffset is not synchronized. I am hesitating whether to fix those problems in current trunk or just waiting for Kafka-1997 to be checked in. If you have a strong opinion about this, we can probably fix those 2 issues in the trunk. It should be a small patch but I just donÂčt want to people get distracted. Jiangjie (Becket) Qin On 3/6/15, 10:15 PM, tao xiao xiaotao...@gmail.com wrote: I think I worked out the root cause Line 593 in MirrorMaker.scala trace(Updating offset for %s to %d.format(topicPartition, offset)) should be trace(Updating offset for %s to %d.format(topicPartition, offset.element)) On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote: A bit more context: I turned on async in producer.properties On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266
How does num.consumer.fetchers get used
Hi team, After reading the source code of AbstractFetcherManager I found out that the usage of num.consumer.fetchers may not match what is described in the Kafka doc. My interpretation of the Kafka doc is that the number of fetcher threads is controlled by the value of property num.consumer.fetchers. If I set num.consumer.fetchers=4 there are 4 fetcher threads in total created after consumer is initialized. But what I found from the source code tells me a different thing. Below code is copied from AbstractFetcherManager private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { mapLock synchronized { val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) = BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} for ((brokerAndFetcherId, partitionAndOffsets) - partitionsPerFetcher) { var fetcherThread: AbstractFetcherThread = null fetcherThreadMap.get(brokerAndFetcherId) match { case Some(f) = fetcherThread = f case None = fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) fetcherThread.start } fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) = topicAndPartition - brokerAndInitOffset.initOffset }) } } If I have one topic with one partition and num.consumer.fetchers set to 4 there is actually only one fetcher thread created not 4. num.consumer.fetchers essentially set the max value of number of fetcher threads not the actual number of fetcher threads. The actual number of fetcher threads is controlled by this line of code Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers Is my assumption correct? -- Regards, Tao
Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao
Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
A bit more context: I turned on async in producer.properties On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao -- Regards, Tao
Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code
I think I worked out the root cause Line 593 in MirrorMaker.scala trace(Updating offset for %s to %d.format(topicPartition, offset)) should be trace(Updating offset for %s to %d.format(topicPartition, offset.element)) On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote: A bit more context: I turned on async in producer.properties On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am having java.util.IllegalFormatConversionException when running MirrorMaker with log level set to trace. The code is off latest trunk with commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f The way I bring up is bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties --num.streams 1 --num.producers 1 --no.data.loss --whitelist mm-benchmark-test\\w* --offset.commit.interval.ms 1 --queue.byte.size 1024 and set the log level to trace in tools-log4j.properties here is the log snippet [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,211] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@130362d0, value=[B@434c4f70 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@54957b67, value=[B@21d8d293 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@1eed723b, value=[B@1acd590b with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) [2015-03-07 02:04:27,212] TRACE Sending record ProducerRecord(topic=mm-benchmark-test, partition=null, key=[B@3ae8a936, value=[B@bd3671 with callback kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic mm-benchmark-test partition 0 (org.apache.kafka.clients.producer.KafkaProducer) [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback on message for topic-partition mm-benchmark-test-0: (org.apache.kafka.clients.producer.internals.RecordBatch) java.util.IllegalFormatConversionException: d != kafka.tools.MirrorMaker$UnackedOffset at java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) at java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) at java.util.Formatter.format(Formatter.java:2488) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletion$2.apply(MirrorMaker.scala:592) at kafka.utils.Logging$class.trace(Logging.scala:36) at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) at kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorMaker.scala:592) at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:91) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:267) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:235) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:312) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) at java.lang.Thread.run(Thread.java:745) [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending message with value size 13 (kafka.tools.MirrorMaker$ProducerThread) -- Regards, Tao -- Regards, Tao
Re: Mirror maker end to end latency metric
Thanks Jon and Guangzhou for the info On Fri, Mar 6, 2015 at 1:10 AM, Jon Bringhurst jbringhu...@linkedin.com.invalid wrote: Hey Tao, Slides 27-30 on http://www.slideshare.net/JonBringhurst/kafka-audit-kafka-meetup-january-27th-2015 has a diagram to visually show that Guozhang is talking about. -Jon On Mar 5, 2015, at 9:03 AM, Guozhang Wang wangg...@gmail.com wrote: There is no end2end latency metric in MM, since such a metric requires timestamp info on the source / dest Kafka clusters. For example, at LinkedIn we add a timestamp in the message header, and let a separate consumer to fetch the message on both ends to measure the latency. Guozhang On Wed, Mar 4, 2015 at 11:07 PM, tao xiao xiaotao...@gmail.com wrote: Hi team, Is there a built-in metric that can measure the end to end latency in MM? -- Regards, Tao -- -- Guozhang -- Regards, Tao
Re: Kafka DefaultPartitioner is not behaved as expected.
The reason you need to use a.getBytes is because the default serializer.class is kafka.serializer.DefaultEncoder which takes byte[] as input. The way the array returns hash code is not based on equality of the elements hence every time a new byte array is created which is the case in your sample code the hash code is going to be different. If you really want to stick with the same partition for the key you'd better use kafka.serializer.StringEncoder as the serializer.class. This StringEncoder takes string as input and as you know string always returns same hash code if the value is the same. On Fri, Mar 6, 2015 at 2:23 AM, Zijing Guo alter...@yahoo.com.invalid wrote: And also there something that I think worth mentioning,when I call prod.send(KeyedMessage(foo, a, test message)), the data can't be delivered to the brokers, the only way to make it work is through:prod.send(KeyedMessage(foo, a.getBytes, test message.getBytes)). When I convert the data and key to bytes, the data is not going to the proper partitions. Thanks On Thursday, March 5, 2015 12:59 PM, Zijing Guo alter...@yahoo.com.INVALID wrote: Hi Guozhang,I'm using kafka 0.8.2.0 Thanks On Thursday, March 5, 2015 12:57 PM, Guozhang Wang wangg...@gmail.com wrote: Zijing, Which version of Kafka client are you using? On Thu, Mar 5, 2015 at 8:50 AM, Zijing Guo alter...@yahoo.com.invalid wrote: Hi community,I have a 2 nodes test cluster with 2 zk instance and 2 broker instance running and I'm experimenting kafka producer in a cluster environment. So I create a topic foo with 2 partitions and replication 1.I create a async Producer without defining partition.class (so the partitioner will be the default one, which is kafka.producer.DefaultPartitioner and I verified.) Now since I know that there is 2 partitions for topic foo and I create 1000 KeyedMessage with key = a val msgs = val msgs = (1 to 1000).map(e = KeyedMessage(foo,test message + e, a))prod.send(msgs) In theory, a.hashCode=97, 97 % 2 = 1. so I should expect all the message go to broker1. However,after I send the message, from the kafka Web console, I can see that the data is evenly distributed around the 2 brokers. Any help will be appreciated.Thanks -- -- Guozhang -- Regards, Tao
Mirror maker end to end latency metric
Hi team, Is there a built-in metric that can measure the end to end latency in MM? -- Regards, Tao
Re: Got negative offset lag after restarting brokers
Thanks guy. with unclean.leader.election.enable set to false the issue is fixed On Tue, Mar 3, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: of course :) unclean.leader.election.enable On Mon, Mar 2, 2015 at 9:10 PM, tao xiao xiaotao...@gmail.com wrote: How do I achieve point 3? is there a config that I can set? On Tue, Mar 3, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: The scenario you mentioned is equivalent to an unclean leader election. The following settings will make sure there is no data loss: 1. Set replica factor to 3 and minimum ISR size to 2. 2. When produce, use acks=-1 or acks=all 3. Disable unclean leader election. 1) and 2) Guarantees committed messages will be at least in to brokers. 3) Means if a broker is not in ISR, it cannot be elected as a leader, so the log truncate as mentioned earlier will not happen. Jiangjie (Becket) Qin On 3/2/15, 7:16 PM, tao xiao xiaotao...@gmail.com wrote: Since I reused the same consumer group to consume the messages after step 6 data there was no data loss occurred. But if I create a new consumer group for sure the new consumer will suffer data loss. I am more concerning about if this is an acceptable behavior by Kafka that an out of sync broker can be elected as the leader for a partition. Is there any mechanism built around Kafka to ensure that only the in-sync broker can be chosen to be a leader? If no, what is the best practice to restart brokers if some of the replicas are out of sync? On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: In this case you have data loss. In step 6, when broker 1 comes up, it becomes the leader and has log end offset 1000. When broker 0 comes up, it becomes follower and will truncate its log to 1000, i.e. 1000 messages were lost. Next time when the consumer starts, its offset will be reset to either the smallest or the largest depending on the setting. Jiangjie (Becket) Qin On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote: Each topic has: earliest and latest offsets (per partition) Each consumer group has a current offset (per topic, partition pair) I see -1 for the current offsets new consumer groups that haven't yet committed an offset. I think it means that the offsets for that consumer group are undefined. Is it possible you generated new consumer groups when you restarted your broker? On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some tests on the two brokers to verify how leader got elected. Here are the steps: 1. started 2 brokers 2. created a topic with partition=1 and replication-factor=2. Now brokers 1 was elected as leader 3. sent 1000 messages to the topic and consumed from a high level consumer using zk as the offset storage. 4. shutdown broker 1 and now broker 0 was elected as leader 5. sent another 1000 messages to topic and consumed again 6. completely shutdown broker 0 and then started broker 1. now broker 1 became the leader 7. started broker 0 and ran ConsumerOffsetChecker which showed negative lag (-1000 in my case) I think this is because the consumed offset in zk was 2000 and logsize retrieved from the leader (broker 1) which missed 1000 messages in step 5 in this case was 1000 there -1000 = 1000 - 2000 was given. Is this a bug or expected behavior? -- Regards, Tao -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: New subscriber offset
You can set the consumer config auto.offset.reset=largest Ref: http://kafka.apache.org/documentation.html#consumerconfigs On Tue, Mar 3, 2015 at 8:30 PM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Hi, We are using HighLevelConsumer and when a new subscription is added to the topic, the HighLevelConsumer for the same group starts from the start of the Kafka topic log. Is there anyway we could set the offset of the HighLevelConsumer to the end of the log instead? We don't want to move to LowLevelConsumer for this only case. Can we manually update the offset in the __consumer_offsets topic in 0.8.2? Please help. -- Regards Vamsi Subhash -- Regards, Tao
Re: Got negative offset lag after restarting brokers
Since I reused the same consumer group to consume the messages after step 6 data there was no data loss occurred. But if I create a new consumer group for sure the new consumer will suffer data loss. I am more concerning about if this is an acceptable behavior by Kafka that an out of sync broker can be elected as the leader for a partition. Is there any mechanism built around Kafka to ensure that only the in-sync broker can be chosen to be a leader? If no, what is the best practice to restart brokers if some of the replicas are out of sync? On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: In this case you have data loss. In step 6, when broker 1 comes up, it becomes the leader and has log end offset 1000. When broker 0 comes up, it becomes follower and will truncate its log to 1000, i.e. 1000 messages were lost. Next time when the consumer starts, its offset will be reset to either the smallest or the largest depending on the setting. Jiangjie (Becket) Qin On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote: Each topic has: earliest and latest offsets (per partition) Each consumer group has a current offset (per topic, partition pair) I see -1 for the current offsets new consumer groups that haven't yet committed an offset. I think it means that the offsets for that consumer group are undefined. Is it possible you generated new consumer groups when you restarted your broker? On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some tests on the two brokers to verify how leader got elected. Here are the steps: 1. started 2 brokers 2. created a topic with partition=1 and replication-factor=2. Now brokers 1 was elected as leader 3. sent 1000 messages to the topic and consumed from a high level consumer using zk as the offset storage. 4. shutdown broker 1 and now broker 0 was elected as leader 5. sent another 1000 messages to topic and consumed again 6. completely shutdown broker 0 and then started broker 1. now broker 1 became the leader 7. started broker 0 and ran ConsumerOffsetChecker which showed negative lag (-1000 in my case) I think this is because the consumed offset in zk was 2000 and logsize retrieved from the leader (broker 1) which missed 1000 messages in step 5 in this case was 1000 there -1000 = 1000 - 2000 was given. Is this a bug or expected behavior? -- Regards, Tao -- Regards, Tao
Re: Got negative offset lag after restarting brokers
How do I achieve point 3? is there a config that I can set? On Tue, Mar 3, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: The scenario you mentioned is equivalent to an unclean leader election. The following settings will make sure there is no data loss: 1. Set replica factor to 3 and minimum ISR size to 2. 2. When produce, use acks=-1 or acks=all 3. Disable unclean leader election. 1) and 2) Guarantees committed messages will be at least in to brokers. 3) Means if a broker is not in ISR, it cannot be elected as a leader, so the log truncate as mentioned earlier will not happen. Jiangjie (Becket) Qin On 3/2/15, 7:16 PM, tao xiao xiaotao...@gmail.com wrote: Since I reused the same consumer group to consume the messages after step 6 data there was no data loss occurred. But if I create a new consumer group for sure the new consumer will suffer data loss. I am more concerning about if this is an acceptable behavior by Kafka that an out of sync broker can be elected as the leader for a partition. Is there any mechanism built around Kafka to ensure that only the in-sync broker can be chosen to be a leader? If no, what is the best practice to restart brokers if some of the replicas are out of sync? On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: In this case you have data loss. In step 6, when broker 1 comes up, it becomes the leader and has log end offset 1000. When broker 0 comes up, it becomes follower and will truncate its log to 1000, i.e. 1000 messages were lost. Next time when the consumer starts, its offset will be reset to either the smallest or the largest depending on the setting. Jiangjie (Becket) Qin On 3/2/15, 9:32 AM, Stuart Reynolds s...@stureynolds.com wrote: Each topic has: earliest and latest offsets (per partition) Each consumer group has a current offset (per topic, partition pair) I see -1 for the current offsets new consumer groups that haven't yet committed an offset. I think it means that the offsets for that consumer group are undefined. Is it possible you generated new consumer groups when you restarted your broker? On Mon, Mar 2, 2015 at 3:15 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some tests on the two brokers to verify how leader got elected. Here are the steps: 1. started 2 brokers 2. created a topic with partition=1 and replication-factor=2. Now brokers 1 was elected as leader 3. sent 1000 messages to the topic and consumed from a high level consumer using zk as the offset storage. 4. shutdown broker 1 and now broker 0 was elected as leader 5. sent another 1000 messages to topic and consumed again 6. completely shutdown broker 0 and then started broker 1. now broker 1 became the leader 7. started broker 0 and ran ConsumerOffsetChecker which showed negative lag (-1000 in my case) I think this is because the consumed offset in zk was 2000 and logsize retrieved from the leader (broker 1) which missed 1000 messages in step 5 in this case was 1000 there -1000 = 1000 - 2000 was given. Is this a bug or expected behavior? -- Regards, Tao -- Regards, Tao -- Regards, Tao
Got negative offset lag after restarting brokers
Hi team, I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I did some tests on the two brokers to verify how leader got elected. Here are the steps: 1. started 2 brokers 2. created a topic with partition=1 and replication-factor=2. Now brokers 1 was elected as leader 3. sent 1000 messages to the topic and consumed from a high level consumer using zk as the offset storage. 4. shutdown broker 1 and now broker 0 was elected as leader 5. sent another 1000 messages to topic and consumed again 6. completely shutdown broker 0 and then started broker 1. now broker 1 became the leader 7. started broker 0 and ran ConsumerOffsetChecker which showed negative lag (-1000 in my case) I think this is because the consumed offset in zk was 2000 and logsize retrieved from the leader (broker 1) which missed 1000 messages in step 5 in this case was 1000 there -1000 = 1000 - 2000 was given. Is this a bug or expected behavior? -- Regards, Tao
Re: How replicas catch up the leader
Thanks Harsha. In my case the replica doesn't catch up at all. the last log date is 5 days ago. It seems the failed replica is excluded from replication list. I am looking for a command that can add the replica back to the ISR list or force it to start sync-up again On Sat, Feb 28, 2015 at 4:27 PM, Harsha ka...@harsha.io wrote: you can increase num.replica.fetchers by default its 1 and also try increasing replica.fetch.max.bytes -Harsha On Fri, Feb 27, 2015, at 11:15 PM, tao xiao wrote: Hi team, I had a replica node that was shutdown improperly due to no disk space left. I managed to clean up the disk and restarted the replica but the replica since then never caught up the leader shown below Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 5 Replicas: 1,5,6 Isr: 5,6 broker 1 is the replica that failed before. Is there a way that I can force the replica to catch up the leader? -- Regards, Tao -- Regards, Tao
How replicas catch up the leader
Hi team, I had a replica node that was shutdown improperly due to no disk space left. I managed to clean up the disk and restarted the replica but the replica since then never caught up the leader shown below Topic:test PartitionCount:1 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 5 Replicas: 1,5,6 Isr: 5,6 broker 1 is the replica that failed before. Is there a way that I can force the replica to catch up the leader? -- Regards, Tao
Re: kafka partitions api
Alex, You can get partition from MessageAndMetadata as partition is exported via constructor parameter On Fri, Feb 27, 2015 at 2:12 PM, Alex Melville amelvi...@g.hmc.edu wrote: Tao and Gaurav, After looking through the source code in Kafka v8.2.0, I don't see any partition() function on the MessageAndMetadata object. Here's the class's source: package kafka.message import kafka.serializer.Decoder import kafka.utils.Utils case class MessageAndMetadata[K, V](topic: String, partition: Int, private val rawMessage: Message, offset: Long, keyDecoder: Decoder[K], valueDecoder: Decoder[V]) { /** * Return the decoded message key and payload */ def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key)) def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload)) -Alex M. On Thu, Feb 26, 2015 at 9:54 PM, Gaurav Agarwal gaurav130...@gmail.com wrote: that's fine to me , you can open a separate thread , But the original question when the consumerconnector got connected to a separate topic , Whether KafkaStream will have all the information of the partitions for that corresponding topic , Please confirm Thanks On Fri, Feb 27, 2015 at 11:20 AM, Alex Melville amelvi...@g.hmc.edu wrote: I was going to make a separate email thread for this question but this thread's topic echoes what my own would have been. How can I query a broker or zookeeper for the number of partitions in a given topic? I'm trying to write a custom partitioner that sends a message to every partition within a topic, and so I need to know the total number of partitions before I call Producer.send(). Alex On Thu, Feb 26, 2015 at 7:32 PM, tao xiao xiaotao...@gmail.com wrote: Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao -- Regards, Tao
Re: kafka partitions api
Gaurav, You can get the partition number the message belongs to via MessageAndMetadata.partition() On Fri, Feb 27, 2015 at 5:16 AM, Jun Rao j...@confluent.io wrote: The partition api is exposed to the consumer in 0.8.2. Thanks, Jun On Thu, Feb 26, 2015 at 10:53 AM, Gaurav Agarwal gaurav130...@gmail.com wrote: After retrieving a kafka stream or kafka message how to get the corresponding partition number to which it belongs ? I am using kafka version 0.8.1. More specifically kafka.consumer.KafkaStream and kafka.message.MessageAndMetaData classes, does not provide API to retrieve partition number. Are there any other API's to get the partition number? IF there are multiple partitions of a topic ,Do i need to declare from java code how many partitions the topic contains or i can leave it topic Kafkastream will take the partition information from kafka broker at runtime.? -- Regards, Tao
Re: Default MirrorMaker not copying over from source to target
Looks like you only have 4 messages in your topic and no more messages got sent 2015-02-19 20:09:34,661] DEBUG initial fetch offset of consolemm:0: fetched offset = 4: consumed offset = 4 is 4 (kafka.consumer.PartitionTopicInfo You can try sending more messages to topic or give the MM a different consumer group id and set auto.offset.reset=smallest On Friday, February 20, 2015, Alex Melville amelvi...@g.hmc.edu wrote: Tao, I updated the mirrorconsumer.properties config file as you suggested, and upped the MM's log level to DEBUG. I have the output of the DEBUG logger here in this pastebin, if you could take a minute to look for anything in its contents that would indicate a problem that would be extremely helpful. Note that my servers hostnames are of the form ad-010X or ba-0X where X is some integer between 1 and 4. http://pastebin.com/rBsxx15A When I run the mirrormaker and then spin up a console consumer to read from the source cluster, I get 0 messages consumed. Alex On Sun, Feb 15, 2015 at 3:00 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: Alex, Are you sure you have data continually being sent to the topic in source cluster after you bring up MM? By default auto.offset.reset=largest in MM consumer config which means MM only fetches the largest offset if the consumer group has no initial offset in zookeeper. You can have MM print more log by changing the log level in config/tools-log4j.properties On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville amelvi...@g.hmc.edu javascript:; wrote: Hi Kafka'ers, I am trying to get the Mirrormaker working with two separate clusters, one as the source and the other as the target. The topic I'm trying to copy over exists on both the source and target clusters. Here are the relevant entries in my consumer and producer properties files, which I'm specifying the command I run to start the MM: *mirrorconsumer.properties:* zookeeper.connect=ad-0104:2181 zookeeper.connection.timeout.ms=6000 group.id=test-consumer-group *mirrorproducer.properties:* metadata.broker.list=ba-02:9092,ba-03:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder Then I run the following command: bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ../config/mirrorconsumer.properties --producer.config ../config/mirrorproducer.properties --whitelist consolemm so consolemm is the topic I'm trying to copy over. I've created consolemm and have used to console-consumer to verify that there are messages in the topic. When I run this command... nothing happens. The process keeps running and prints nothing to the Terminal. If I look in the output of the zookeeper on the source cluster I get only the following: [2015-02-15 00:34:06,102] INFO Accepted socket connection from / 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-15 00:34:06,104] INFO Client attempting to establish new session at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 with negotiated timeout 6000 for client /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) and when I look at the output of one of the brokers on the source cluster I get: [2015-02-15 00:32:14,382] INFO Closing socket connection to / 10.7.162.75 . (kafka.network.Processor) and there is no output on the zookeeper on the target cluster. Any advice on what is causing MM to not properly copy over data to the target cluster would be extremely helpful. -Alex -- Regards, Tao -- Regards, Tao
Re: consumer lag metric
Thanks Todd. that will work On Tue, Feb 17, 2015 at 10:31 PM, Todd Palino tpal...@gmail.com wrote: In order to do that, you'll need to run it and parse the output, and then emit it to your metrics system of choice. This is essentially what I do - I have a monitoring application which runs every minute and pulls the offsets for a select set of topics and consumers, and then packages up the metrics and sends them to our internal system. It's not ideal. We're working on a script to calculate lag efficiently for all consumers who commit offsets to Kafka, rather than a select set. -Todd On Mon, Feb 16, 2015 at 12:27 AM, tao xiao xiaotao...@gmail.com wrote: Thank you Todd for your detailed explanation. Currently I export all metrics to graphite using the reporter configuration. is there a way I can do similar thing with offset checker? On Mon, Feb 16, 2015 at 4:21 PM, Todd Palino tpal...@gmail.com wrote: The reason for this is the mechanic by which each of the lags are calculated. MaxLag (and the FetcherLagMetric) are calculated by the consumer itself using the difference between the offset it knows it is at, and the offset that the broker has as the end of the partition. The offset checker, however, uses the last offset that the consumer committed. Depending on your configuration, this is somewhere behind where the consumer actually is. For example, if your commit interval is set to 10 minutes, the number used by the offset checker can be up to 10 minutes behind where it actually is. So while MaxLag may be more up to date at any given time, it's actually less accurate. Because MaxLag relies on the consumer to report it, if the consumer breaks, you will not see an accurate lag number. This is why when we are checking consumer lag, we use an external process that uses the committed consumer offsets. This allows us to catch a broken consumer, as well as an active consumer that is just falling behind. -Todd On Fri, Feb 13, 2015 at 9:34 PM, tao xiao xiaotao...@gmail.com wrote: Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are always much smaller than the lag shown in offset checker. any reason? On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com wrote: There are FetcherLagMetrics that you can take a look at. However, it is probably easiest to just monitor MaxLag as that reports the maximum of all the lag metrics. On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote: Hi team, Is there a metric that shows the consumer lag of a particular consumer group? similar to what offset checker provides -- Regards, Tao -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: consumer lag metric
Thank you Todd for your detailed explanation. Currently I export all metrics to graphite using the reporter configuration. is there a way I can do similar thing with offset checker? On Mon, Feb 16, 2015 at 4:21 PM, Todd Palino tpal...@gmail.com wrote: The reason for this is the mechanic by which each of the lags are calculated. MaxLag (and the FetcherLagMetric) are calculated by the consumer itself using the difference between the offset it knows it is at, and the offset that the broker has as the end of the partition. The offset checker, however, uses the last offset that the consumer committed. Depending on your configuration, this is somewhere behind where the consumer actually is. For example, if your commit interval is set to 10 minutes, the number used by the offset checker can be up to 10 minutes behind where it actually is. So while MaxLag may be more up to date at any given time, it's actually less accurate. Because MaxLag relies on the consumer to report it, if the consumer breaks, you will not see an accurate lag number. This is why when we are checking consumer lag, we use an external process that uses the committed consumer offsets. This allows us to catch a broken consumer, as well as an active consumer that is just falling behind. -Todd On Fri, Feb 13, 2015 at 9:34 PM, tao xiao xiaotao...@gmail.com wrote: Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are always much smaller than the lag shown in offset checker. any reason? On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com wrote: There are FetcherLagMetrics that you can take a look at. However, it is probably easiest to just monitor MaxLag as that reports the maximum of all the lag metrics. On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote: Hi team, Is there a metric that shows the consumer lag of a particular consumer group? similar to what offset checker provides -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: API to get the partition number
You can get the partition number and offset of the message by MessageAndMetadata.partition() and MessageAndMetadata.offset(). To your scenario you can turn off auto commit auto.commit.enable=false and then commit by yourself after finishing message consumption. On Mon, Feb 16, 2015 at 1:40 PM, Arunkumar Srambikkal (asrambik) asram...@cisco.com wrote: Hi, Is there a way to get the current partition number and current offset, when using the *high level consumer* in 0.8.2? I went through the previous messages and in the previous version I think there are none. The reason we want to do this, is that I plan to have a consumer without the default commit of offsets, to avoid the scenario of consumers going down before updating the accurate offset. Rgds Arun -- Regards, Tao
Re: consumer lag metric
Thanks Joel. But I discover that both MaxLag and FetcherLagMetrics are always much smaller than the lag shown in offset checker. any reason? On Sat, Feb 14, 2015 at 7:22 AM, Joel Koshy jjkosh...@gmail.com wrote: There are FetcherLagMetrics that you can take a look at. However, it is probably easiest to just monitor MaxLag as that reports the maximum of all the lag metrics. On Fri, Feb 13, 2015 at 05:03:28PM +0800, tao xiao wrote: Hi team, Is there a metric that shows the consumer lag of a particular consumer group? similar to what offset checker provides -- Regards, Tao -- Regards, Tao
Re: offset migration from kafka to zookeeper
Thanks Jiangjie for your help On Sat, Feb 14, 2015 at 5:59 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for looking into that! On Fri, Feb 13, 2015 at 05:31:39AM +, Jiangjie Qin wrote: I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper. So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote: I used the one shipped with 0.8.2. It is pretty straightforward to reproduce the issue. Here are the steps to reproduce: 1. I have a consumer using high level consumer API with initial settings offsets.storage=kafka and dual.commit.enabled=false. 2. After consuming messages for a while shutdown the consumer and change setting dual.commit.enabled=true 3. bounce the consumer and run for while. The lag looks good 4. change setting offsets.storage=zookeeper and bounce the consumer. Starting from now the lag remain unchanged On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote: That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I
offset migration from kafka to zookeeper
Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); Here is the steps 1. set dual.commit.enabled=true 2. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker 3. set offsets.storage=zookeeper 4. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker After step 4 my consumer was able to continually consume data from topic but the offset lag remained unchanged. Did I do anything wrong? -- Regards, Tao
Re: offset migration from kafka to zookeeper
Thanks for the explanation. It there a way that I can wipe out the offset stored in kafka so that the checker can continue to work again? On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper. So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote: I used the one shipped with 0.8.2. It is pretty straightforward to reproduce the issue. Here are the steps to reproduce: 1. I have a consumer using high level consumer API with initial settings offsets.storage=kafka and dual.commit.enabled=false. 2. After consuming messages for a while shutdown the consumer and change setting dual.commit.enabled=true 3. bounce the consumer and run for while. The lag looks good 4. change setting offsets.storage=zookeeper and bounce the consumer. Starting from now the lag remain unchanged On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote: That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer
Re: createMessageStreams vs createMessageStreamsByFilter
Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote: Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads. TopicFilter filter = new Whitelist(.*); int threadTotal = 2; ListKafkaStreambyte[], byte[] streams = connector .createMessageStreamsByFilter(filter, threadTotal); But what I observed from the log is different 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker partitions consumed by consumer thread test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 As you can see from the log there are only 2 threads created and shared among 3 topics. With this setting I think the parallelism is degraded and a slow topic may impact other topics' consumption performance. Any thoughts? On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang wangg...@gmail.com wrote: createMessageStreams is used for consuming from specific topic(s), where you can put a map of [topic-name, num-threads] as its input parameters; createMessageStreamsByFilter is used for consuming from wildcard topics, where you can put a (regex, num-threads) as its input parameters, and for each regex matched topic num-threads will be created. The difference between these two are not really for throughput / latency, but rather consumption semantics. Guozhang On Tue, Feb 10, 2015 at 3:02 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I am comparing the differences between ConsumerConnector.createMessageStreams and ConsumerConnector.createMessageStreamsByFilter. My understanding is that createMessageStreams creates x number of threads (x is the number of threads passed in to the method) dedicated to the specified topic while createMessageStreamsByFilter creates x number of threads shared by topics specified by TopicFilter. Is it correct? If this is the case I assume
Re: createMessageStreams vs createMessageStreamsByFilter
Do you know when the new consumer API will be publicly available? On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang wangg...@gmail.com wrote: Yes, it can get stuck. For example, AC and BC are processed by two different processes and AC processors gets stuck, hence AC messages will fill up in the consumer's buffer and eventually prevents the fetcher thread to put more data into it; the fetcher thread will be blocked on that and not be able to fetch BC. This issue has been addressed in the new consumer client, which is single-threaded with non-blocking APIs. Guozhang On Tue, Feb 10, 2015 at 6:24 PM, tao xiao xiaotao...@gmail.com wrote: Thank you Guozhang for your detailed explanation. In your example createMessageStreamsByFilter(*C = 3) since threads are shared among topics there may be situation where all 3 threads threads get stuck with topic AC e.g. topic is empty which will be holding the connecting threads (setting consumer.timeout.ms=-1) hence there is no thread to serve topic BC. do you think this situation will happen? On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: I was not clear before .. for createMessageStreamsByFilter each matched topic will have num-threads, but shared: i.e. there will be totally num-threads created, but each thread will be responsible for fetching all matched topics. A more concrete example: say you have topic AC: 3 partitions, topic BC: 6 partitions. With createMessageStreams(AC = 3, BC = 2) a total of 5 threads will be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively; With createMessageStreamsByFilter(*C = 3) a total of 3 threads will be created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6 respectively. Guozhang On Tue, Feb 10, 2015 at 8:37 AM, tao xiao xiaotao...@gmail.com wrote: Guozhang, Do you mean that each regex matched topic owns number of threads that get passed in to createMessageStreamsByFilter ? For example in below code If I have 3 matched topics each of which has 2 partitions then I should have 3 * 2 = 6 threads in total with each topic owning 2 threads. TopicFilter filter = new Whitelist(.*); int threadTotal = 2; ListKafkaStreambyte[], byte[] streams = connector .createMessageStreamsByFilter(filter, threadTotal); But what I observed from the log is different 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 WARN kafka.utils.Logging$class:83 - No broker partitions consumed by consumer thread test1234dd5_localhost-1423585444070-82f23758-1 for topic kafkatopic-1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - Consumer test1234dd5_localhost-1423585444070-82f23758 rebalancing the following partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with consumers: List(test1234dd5_localhost-1423585444070-82f23758-0, test1234dd5_localhost-1423585444070-82f23758-1) 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim partition 1 2015-02-11 00:24:04 INFO kafka.utils.Logging$class:68 - test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim partition 0 As you can see from the log there are only 2 threads created and shared among 3 topics. With this setting I think the parallelism is degraded and a slow topic may impact other topics' consumption performance. Any thoughts? On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang wangg...@gmail.com wrote: createMessageStreams is used for consuming from specific topic(s), where you can put a map of [topic-name, num-threads] as its input parameters; createMessageStreamsByFilter is used for consuming from wildcard
createMessageStreams vs createMessageStreamsByFilter
Hi team, I am comparing the differences between ConsumerConnector.createMessageStreams and ConsumerConnector.createMessageStreamsByFilter. My understanding is that createMessageStreams creates x number of threads (x is the number of threads passed in to the method) dedicated to the specified topic while createMessageStreamsByFilter creates x number of threads shared by topics specified by TopicFilter. Is it correct? If this is the case I assume createMessageStreams is the preferred way to create streams for each topic if I have high throughput and low latency demands. is my assumption correct? -- Regards, Tao
Re: Got ClosedByInterruptException when closing ConsumerConnector
It happens every time I shutdown the connector. It doesn't block the shutdown process though On Tue, Feb 10, 2015 at 1:09 AM, Guozhang Wang wangg...@gmail.com wrote: Is this exception transient or consistent and blocking the shutdown process? On Mon, Feb 9, 2015 at 3:07 AM, tao xiao xiaotao...@gmail.com wrote: Hi team, I got java.nio.channels.ClosedByInterruptException when closing ConsumerConnector using kafka 0.8.2 Here is the exception 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], ZKConsumerConnector shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] Stopping leader finder thread 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Shutdown completed 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] Stopping all fetchers 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Stopped 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Stopped 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Shutdown completed 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] All connections stopped 2015-02-09 19:04:19 INFO org.I0Itec.zkclient.ZkEventThread:82 - Terminate ZkClient event thread. 2015-02-09 19:04:19 INFO org.apache.zookeeper.ZooKeeper:684 - Session: 0x14b6dd8fcf80011 closed 2015-02-09 19:04:19 INFO org.apache.zookeeper.ClientCnxn$EventThread:512 - EventThread shut down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms -- Regards, Tao -- -- Guozhang -- Regards, Tao
Got ClosedByInterruptException when closing ConsumerConnector
Hi team, I got java.nio.channels.ClosedByInterruptException when closing ConsumerConnector using kafka 0.8.2 Here is the exception 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], ZKConsumerConnector shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] Stopping leader finder thread 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Shutdown completed 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] Stopping all fetchers 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], Stopped 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Shutting down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Stopped 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherThread-test12345_localhost], Shutdown completed 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [ConsumerFetcherManager-1423479848796] All connections stopped 2015-02-09 19:04:19 INFO org.I0Itec.zkclient.ZkEventThread:82 - Terminate ZkClient event thread. 2015-02-09 19:04:19 INFO org.apache.zookeeper.ZooKeeper:684 - Session: 0x14b6dd8fcf80011 closed 2015-02-09 19:04:19 INFO org.apache.zookeeper.ClientCnxn$EventThread:512 - EventThread shut down 2015-02-09 19:04:19 INFO kafka.utils.Logging$class:68 - [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms -- Regards, Tao
Is auto.commit.enable still applicable when setting offsets.storage to kafka
Hi team, If I set offsets.storage=kafka can I still use auto.commit.enable to turn off auto commit and auto.commit.interval.ms to control commit interval ? As the documentation mentions that the above two properties are used to control offset to zookeeper. -- Regards, Tao
Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition
Alex, I got similar error before due to incorrect network binding of my laptop's wireless interface. You can try with setting advertised.host.name=kafka's server hostname in the server.properties and run it again. On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote: Howdy all, I recently upgraded to Kafka 0.8.2.0 and am trying to verify that everything still works as expected. I spin up two brokers, one zk instance, and then create a topic using kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme --partitions 2 --replication-factor 1 Then I run --describe to check if the partitions have leaders. I get kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs: Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Finally, I run the console producer kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme I get the following warning [2015-02-08 00:36:24,244] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and then it waits for console input. When I try to send a message I get the following list of error messages [2015-02-08 00:37:04,735] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,751] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,859] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:04,968] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,079] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: deleteme (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,189] WARN Error while fetching metadata [{TopicMetadata for topic deleteme - No partition metadata for topic deleteme due to kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
Got IOException when writing metrics to csv file
Hi team, I was running the mirror maker off the trunk code and got IOException when configuring the mirror maker to use KafkaCSVMetricsReporter as the metric reporter Here is the exception I got java.io.IOException: Unable to create /tmp/csv1/BytesPerSec.csv at com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141) at com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257) at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22) at com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156) at com.yammer.metrics.reporting.CsvReporter.processTimer(CsvReporter.java:212) at com.yammer.metrics.reporting.CsvReporter.processTimer(CsvReporter.java:22) at com.yammer.metrics.core.Timer.processWith(Timer.java:214) at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Here is the configuration I put in the consumer.properties zookeeper.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=100 #consumer group id group.id=kafka-topic auto.offset.reset=smallest #consumer timeout #consumer.timeout.ms=5000 # Metrics kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/tmp/csv1 kafka.csv.metrics.reporter.enabled=true I debug the code and discovered that the issue was caused different metrics sharing the same metric name. I diff the trunk code with 0.8.1 on ConsumerTopicStats.scala from 0.8.1 val byteRate = newMeter(metricId + BytesPerSec, bytes, TimeUnit.SECONDS) from trunk val byteRate = newMeter(BytesPerSec, bytes, TimeUnit.SECONDS, tags) trunk removes the metricId from the name which results in same metric name BytesPerSec used by multiple metrics. Is this a bug or by intention? -- Regards, Tao
Re: Kafka producer perf script throw java.io.IOException
Hi, In order to get it work you can turn off csv-reporter. On Thu, Feb 5, 2015 at 1:06 PM, Xinyi Su xiny...@gmail.com wrote: Hi, Today I updated Kafka cluster from 0.8.2-beta to 0.8.2.0 and run kafka producer performance test. The test cannot continue because of some exceptions thrown which does not occur at 0.8.2-beta. My perf library is kafka-perf_2.9.2-0.8.0.jar which is the latest version on maven repository. -bash-4.1$ bin/kafka-producer-perf-test.sh --broker-list broker list --topics PerfTopic22 --sync --initial-message-id 1 --messages 20 --csv-reporter-enabled --metrics-dir /tmp/PerfTopic22_1 --message-send-gap-ms 20 --request-num-acks -1 --batch-size 1 java.io.IOException: Unable to create /tmp/PerfTopic22_1/ProducerRequestSize.csv at com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(CsvReporter.java:141) at com.yammer.metrics.reporting.CsvReporter.getPrintStream(CsvReporter.java:257) at com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22) at com.yammer.metrics.reporting.CsvReporter$1.getStream(CsvReporter.java:156) at com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:194) at com.yammer.metrics.reporting.CsvReporter.processHistogram(CsvReporter.java:22) at com.yammer.metrics.core.Histogram.processWith(Histogram.java:231) at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- Regards, Tao
Got NPE when running the latest mirror maker that is in trunk
Hi team, I got NPE when running the latest mirror maker that is in trunk [2015-01-23 18:55:20,229] INFO [kafkatopic-1_LM-SHC-00950667-1422010513674-cb0bb562], exception during rebalance (kafka.consumer.ZookeeperConsumerConnector) java.lang.NullPointerException at kafka.tools.MirrorMaker$InternalRebalanceListener.beforeReleasingPartitions(MirrorMaker.scala:631) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:674) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:625) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:616) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:616) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:615) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:931) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:965) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163) at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:273) at kafka.tools.MirrorMaker.main(MirrorMaker.scala) Here is the command I run bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ~/Downloads/kafka/kafka_2.9.2-0.8.1.1/config/consumer.properties --producer.config ~/Downloads/kafka/kafka_2.9.2-0.8.1.1/config/producer.properties --num.streams 2 --num.producers 2 --no.data.loss --whitelist .* -- Regards, Tao
New mirror maker consumer.config question
Hi, I discovered that the new mirror maker implementation in trunk now only accept one consumer.config property instead of a list of them which means we can only supply one source per mirror maker process. Is it a reason for it? If I have multiple source kafka clusters do I need to setup multiple mirror maker processes? -- Regards, Tao
Inter Mirror maker processes offset sync
Hi all, I have two mirror maker processes running on two different machines fetching messages from same topic from one data center to another data center. These two processes are assigned to the same consumer group. If I want no data loss or data duplication even when one of the mirror maker processes die I need to find a way to inform another process the offset the last successfully sent message by the dead process but I know the offset stored in zookeeper is the offset the last consumed message by mirror maker not the offset last successfully sent message, is there a way to configure mirror maker to achieve exact one message semantic? -- Regards, Tao