RE: kafka 0.8.1.1 delay in replica data
i set the conf num.replica.fetchers=6,then solve this. Thanks, Lax Date: Wed, 31 Dec 2014 15:51:57 -0800 Subject: Re: kafka 0.8.1.1 delay in replica data From: j...@confluent.io To: users@kafka.apache.org Is the system time in-sync btw the two hosts? Thanks, Jun On Tue, Dec 23, 2014 at 5:23 AM, chenlax lax...@hotmail.com wrote: topic: Topic_test_data_1-10 leader broker data info: -rw-rw-r-- 1 mqq mqq237728 Dec 23 19:51 000211076304.index -rw-rw-r-- 1 mqq mqq 536875106 Dec 23 19:51 000211076304.log -rw-rw-r-- 1 mqq mqq237632 Dec 23 19:45 000203797807.index -rw-rw-r-- 1 mqq mqq 536879703 Dec 23 19:45 000203797807.log -rw-rw-r-- 1 mqq mqq237576 Dec 23 19:38 000196519197.index -rw-rw-r-- 1 mqq mqq 536885834 Dec 23 19:38 000196519197.log -rw-rw-r-- 1 mqq mqq237848 Dec 23 19:32 000189241352.index -rw-rw-r-- 1 mqq mqq 536882522 Dec 23 19:32 000189241352.log -rw-rw-r-- 1 mqq mqq238048 Dec 23 19:26 000181963182.index -rw-rw-r-- 1 mqq mqq 536874188 Dec 23 19:26 000181963182.log -rw-rw-r-- 1 mqq mqq237472 Dec 23 19:19 000174684585.index -rw-rw-r-- 1 mqq mqq 536883850 Dec 23 19:19 000174684585.log -rw-rw-r-- 1 mqq mqq237848 Dec 23 19:13 000167406454.index -rw-rw-r-- 1 mqq mqq 536877465 Dec 23 19:13 000167406454.log replica data info: -rw-rw-r-- 1 mqq mqq 173539804 Dec 23 20:37 000196519197.log -rw-rw-r-- 1 mqq mqq 10485760 Dec 23 20:37 000196519197.index -rw-rw-r-- 1 mqq mqq 4128 Dec 23 20:36 000189241352.index -rw-rw-r-- 1 mqq mqq 536882522 Dec 23 20:36 000189241352.log -rw-rw-r-- 1 mqq mqq 4128 Dec 23 20:30 000181963182.index -rw-rw-r-- 1 mqq mqq 536874188 Dec 23 20:30 000181963182.log -rw-rw-r-- 1 mqq mqq 4128 Dec 23 20:25 000174684585.index -rw-rw-r-- 1 mqq mqq 536883850 Dec 23 20:25 000174684585.log -rw-rw-r-- 1 mqq mqq 4128 Dec 23 20:20 000167406454.index -rw-rw-r-- 1 mqq mqq 536877465 Dec 23 20:20 000167406454.log the offset 167406454 in leader broker time is 19:13, but in replica broker the time is 20:20. Thanks, Lax From: lax...@hotmail.com To: users@kafka.apache.org Subject: kafka 0.8.1.1 delay in replica data Date: Tue, 23 Dec 2014 20:44:31 +0800 i set a topic retention.ms as 36;and then i find the replica data is delay. What could cause this? as follows screenshot: master broker: replica broker: Thanks, Lax
The message would be committed if message un-committed
as the kafka doc Only committed messages are ever given out to the consumer. . if followers does not copy the message on time and followers are in ISR, consumers would consume the message from leader broker? Thanks, Lax
The message would be consumed if message un-committed
as the kafka doc Only committed messages are ever given out to the consumer. . if followers does not copy the message on time and followers are in ISR, consumers would consume the message from leader broker? Thanks, Lax
kafka deleted old logs but not released
Hi, We use kafka_2.10-0.8.1.1 in our server. Today we found disk space alert. We find many kafka data files are deleted, but still opened by kafka. such as: _yellowpageV2-0/68170670.log (deleted) java 8446 root 724u REG 253,2 536937911 26087362 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/70104914.log (deleted) Is there anything wrong or wrong configed?
Offset management in multi-threaded high-level consumer
Hello, I would like to write a multi-threaded consumer for the high-level consumer in Kafka 0.8.1. I have found two ways that seem feasible while keeping the guarantee that messages in a partition are processed in order. I would appreciate any feedback this list has. Option 1 - Create multiple threads, so each thread has its own ConsumerConnector. - Manually commit offsets in each thread after every N messages. - This was discussed a bit on this list previously. See [1]. ### Questions - Is there a problem with making multiple ConsumerConnectors per machine? - What does it take for ZooKeeper to handle this much load? We have a 3-node ZooKeeper cluster with relatively small machines. (I expect the topic will have about 40 messages per second. There will be 3 consumer groups. That would be 120 commits per second at most, but I can reduce the frequency of commits to make this lower.) ### Extra info Kafka 0.9 will have an entirely different commit API, which will allow one connection to commit offsets per partition, but I can’t wait that long. See [2]. Option 2 - Create one ConsumerConnector, but ask for multiple streams in that connection. Give each thread one stream. - Since there is no way to commit offsets per stream right now, we need to do autoCommit. - This sacrifices the at-least-once processing guarantee, which would be nice to have. See KAFKA-1612 [3]. ### Extra info - There was some discussion in KAFKA-996 about a markForCommit() method so that autoCommit would preserve the at-least-once guarantee, but it seems more likely that the consumer API will just be redesigned to allow commits per partition instead. See [4]. So basically I'm wondering if option 1 is feasible. If not, I'll just do option 2. Of course, let me know if I was mistaken about anything or if there is another design which is better. Thanks in advance. Rafi [1] http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3cff142f6b499ae34caed4d263f6ca32901d35a...@extxmb19.nam.nsroot.net%3E [2] https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI [3] https://issues.apache.org/jira/browse/KAFKA-1612 [4] https://issues.apache.org/jira/browse/KAFKA-966
no space left error
Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li
Re: no space left error
Hi, Your disk is full. You should probably have something that checks/monitors disk space and alerts you when it's full. Maybe you can point Kafka to a different, larger disk or partition. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Tue, Jan 6, 2015 at 3:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed (kafka.server.KafkaServer) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) Any ideas On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote: the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at kafka.utils.Utils$.loadProps(Utils.scala:144) at kafka.Kafka$.main(Kafka.scala:34) at kafka.Kafka.main(Kafka.scala) On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote: Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li -- Alec Li -- Alec Li
Re: no space left error
Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed (kafka.server.KafkaServer) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) Any ideas On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote: the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at kafka.utils.Utils$.loadProps(Utils.scala:144) at kafka.Kafka$.main(Kafka.scala:34) at kafka.Kafka.main(Kafka.scala) On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote: Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li -- Alec Li -- Alec Li
Re: no space left error
I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed (kafka.server.KafkaServer) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) Any ideas On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote: the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at kafka.utils.Utils$.loadProps(Utils.scala:144) at kafka.Kafka$.main(Kafka.scala:34) at kafka.Kafka.main(Kafka.scala) On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote: Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li -- Alec Li -- Alec Li
Re: no space left error
There are two parts to this 1) How to prevent Kafka from filling up disks which https://issues.apache.org/jira/browse/KAFKA-1489 is trying to deal with (I set the ticket to unassigned just now since i don't think anyone is working on it and was assigned by default, could be wrong though so assign back if I am wrong). I don't know the solution off the top of my head but I think it is something we should strive for in 0.8.3 (worse case 0.9.0) as we need a solution it happens frequently enough. 2) until then what to-do when it does happen For #2 I have been pulled into the situation a number of times and honestly the solution has been a bit different each time and not sure any one tool or guideline is going to work honestly it is not always systematic... TBH ... but we could try some more guidelines and experiences that different folks have had if someone already has this written up that would be great otherwise I can carve out some time in the near future and do that (though honestly I would rather effort go to #1 but it is a balance for sure. For Sa's problem (where this thread started) OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 I don't think this is Kafka related though so what we are talking about partition and retention is not applicable, even though important. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Jan 6, 2015 at 3:10 PM, David Birdsong david.birds...@gmail.com wrote: I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
Re: Version numbers
The 0.8.2 branch is our release branch. We won't update any existing released version and instead, will use a new version for every release. So 0.8.2-beta will be changed to 0.8.2.-beta-2 and eventually just 0.8.2. Thanks, Jun On Sun, Jan 4, 2015 at 5:12 PM, Shannon Lloyd shanl...@gmail.com wrote: Is there a reason why the 0.8.2 branch (which I assume is essentially a release branch for the pending 0.8.2 release) has the version number statically set to 0.8.2-beta rather than a -SNAPSHOT version (e.g. 0.8.2-SNAPSHOT or similar)? 0.8.2-beta should technically refer to the released beta, yes? Or are you using -beta here to effectively mean -SNAPSHOT and reserve the right to update the 0.8.2-beta tgz on the website? (Do you in fact update the version on the website?) The main problems with this are that tools like Maven/Gradle consider versions like 0.8.2-beta to be immutable, so it's a PITA to get them to update in your IDE etc. And it just feels wrong having non-SNAPSHOT versions getting clobbered in my local m2 and Gradle caches. Cheers, Shannon
Re: no space left error
Thanks the reply, the disk is not full: root@exemplary-birds:~# df -h Filesystem Size Used Avail Use% Mounted on /dev/sda2 133G 3.4G 123G 3% / none4.0K 0 4.0K 0% /sys/fs/cgroup udev 32G 4.0K 32G 1% /dev tmpfs 6.3G 764K 6.3G 1% /run none5.0M 0 5.0M 0% /run/lock none 32G 0 32G 0% /run/shm none100M 0 100M 0% /run/user /dev/sdb114T 15G 14T 1% /srv Neither the memory root@exemplary-birds:~# free total used free sharedbuffers cached Mem: 659633729698380 56264992776 1706687863812 -/+ buffers/cache:1663900 64299472 Swap: 997372 0 997372 thanks On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com wrote: I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed (kafka.server.KafkaServer) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) Any ideas On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote: the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at
some connection errors happen in performance test
Hi, All I am running performance test on kafka, the command bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test-rep-three 500 100 -1 acks=1 bootstrap.servers= 10.100.10.101:9092 buffer.memory=67108864 batch.size=8196 Since we send 50 billions to brokers, it was OK but periodically pop out such errors: [2015-01-06 19:38:32,127] WARN Error in I/O with exemplary-birds.master/ 127.0.1.1 (org.apache.kafka.common.network.Selector) java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.Selector.poll(Selector.java:232) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:745) 1950 records sent, 224.4 records/sec (0.02 MB/sec), 611.4 ms avg latency, 9259.0 max latency. 2899650 records sent, 579930.0 records/sec (55.31 MB/sec), 2399.5 ms avg latency, 9505.0 max latency. 3170219 records sent, 634043.8 records/sec (60.47 MB/sec), 568.7 ms avg latency, 1201.0 max latency. And I feel the error happen more often, our kafka cluster is a three node cluster. thanks -- Alec Li
config for consumer and producer
Hi, All I am testing and making changes on server.properties, I wonder do I need to specifically change the values in consumer and producer properties, here is the consumer.properties zookeeper.connect=10.100.98.100:2181,10.100.98.101:2181,10.100.98.102:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=100 #consumer group id group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 I use defaults for most of parameters, for group.id, it was defined as A string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group. Do I need to define many consumer-group here? For producer, we are not user java client, it is a C# client sending message to kafka, so this producer won't be matter (except I am doing producer test locally), right? producer.type=sync compression.codec=none Thanks -- Alec Li
Is it possible to enforce an unique constraint through Kafka?
Hi Having read a lot about kafka and its use at linkedin, I'm still unsure whether Kafka can be used, with some mindset change for sure, as a general purpose data store. For example, would someone use Kafka to enforce an unique constraint? A simple use case is, in the case of linkedin, unicity of users' login. What would be you recommended implementation for such a need? Thanks in advance Best, Joseph
MirrorMaker among different version Kafkas
Hi, guys, Want to confirm whether the mirrormaker supports different versions of kafka. For example, if DC1 uses Kafka-0.8 and DC2 uses Kafka-0.7, does DC1 can mirrormake DC2’s messages? It looks this cannot work in my local test. If not work, is there any candidate ways for replicating msgs among different data centers running different version kafka? thanks, Wei
java.io.IOException: Connection reset by peer
Hi, All I am running a C# producer to send messages to kafka (3 nodes cluster), but have such errors: [2015-01-06 16:09:51,143] ERROR Closing socket for /10.100.70.128 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) [2015-01-06 16:09:51,144] ERROR Closing socket for /10.100.70.128 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) [2015-01-06 16:09:56,138] INFO Closing socket connection to /10.100.70.28. (kafka.network.Processor) [2015-01-06 16:10:07,685] INFO Closing socket connection to /10.100.70.28. (kafka.network.Processor) [2015-01-06 16:10:31,423] INFO Closing socket connection to /10.100.70.28. (kafka.network.Processor) [2015-01-06 16:11:08,077] INFO Closing socket connection to /10.100.70.28. (kafka.network.Processor) [2015-01-06 16:11:43,990] INFO Closing socket connection to /10.100.70.28. (kafka.network.Processor) [2015-01-06 16:12:24,168] INFO Closing socket connection to /10.100.70.128. (kafka.network.Processor) But I do see messages in brokers. Any ideas? thanks -- Alec Li
Re: messages lost
Hi, experts Again, we still having the issues of losing data, see we see data 5000 records, but only find 4500 records on brokers, we did set required.acks -1 to make sure all brokers ack, but that only cause the long latency, but not cure the data lost. thanks On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote: @Sa, the required.acks is producer side configuration. Set to -1 means requiring ack from all brokers. On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote: Thanks a lot, Tim, this is the config of brokers -- broker.id=1 port=9092 host.name=10.100.70.128 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 auto.leader.rebalance.enable=true auto.create.topics.enable=true default.replication.factor=3 log.dirs=/tmp/kafka-logs-1 num.partitions=8 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181,10.100.70.29:2181 zookeeper.connection.timeout.ms=100 --- We actually play around request.required.acks in producer config, -1 cause long latency, 1 is the parameter to cause messages lost. But I am not sure, if this is the reason to lose the records. thanks AL On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com wrote: What's your configured required.acks? And also are you waiting for all your messages to be acknowledged as well? The new producer returns futures back, but you still need to wait for the futures to complete. Tim On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote: Hi, all We are sending the message from a producer, we send 10 records, but we see only 99573 records for that topics, we confirm this by consume this topic and check the log size in kafka web console. Any ideas for the message lost, what is the reason to cause this? thanks -- Alec Li -- Alec Li -- Alec Li
Re: messages lost
You should never be storing your log files in /tmp please change that. Ack = -1 is what you should be using if you want to guarantee messages are saved. You should not be seeing high latencies (unless a few milliseconds is high for you). Are you using sync or async producer? What version of Kafka? How are you counting the data from the topic? How are you counting you sent each message and that it successfully acked? How are you counting from the topic and have you verified the counts summed from each partition? Can you share some sample code that reproduces this issue? You can try counting the message from each partition using https://github.com/edenhill/kafkacat and pipe to wc -l it makes for a nice simple sanity check to where the problem might be. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Jan 6, 2015 at 7:21 PM, Sa Li sal...@gmail.com wrote: Hi, experts Again, we still having the issues of losing data, see we see data 5000 records, but only find 4500 records on brokers, we did set required.acks -1 to make sure all brokers ack, but that only cause the long latency, but not cure the data lost. thanks On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote: @Sa, the required.acks is producer side configuration. Set to -1 means requiring ack from all brokers. On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote: Thanks a lot, Tim, this is the config of brokers -- broker.id=1 port=9092 host.name=10.100.70.128 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 auto.leader.rebalance.enable=true auto.create.topics.enable=true default.replication.factor=3 log.dirs=/tmp/kafka-logs-1 num.partitions=8 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181, 10.100.70.29:2181 zookeeper.connection.timeout.ms=100 --- We actually play around request.required.acks in producer config, -1 cause long latency, 1 is the parameter to cause messages lost. But I am not sure, if this is the reason to lose the records. thanks AL On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com wrote: What's your configured required.acks? And also are you waiting for all your messages to be acknowledged as well? The new producer returns futures back, but you still need to wait for the futures to complete. Tim On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote: Hi, all We are sending the message from a producer, we send 10 records, but we see only 99573 records for that topics, we confirm this by consume this topic and check the log size in kafka web console. Any ideas for the message lost, what is the reason to cause this? thanks -- Alec Li -- Alec Li -- Alec Li
Re: no space left error
BTW, I found the the /kafka/logs also getting biger and bigger, like controller.log and state-change.logs. should I launch a cron the clean them up regularly or there is way to delete them regularly? thanks AL On Tue, Jan 6, 2015 at 2:01 PM, Sa Li sal...@gmail.com wrote: Hi, All We fix the problem, I like to share the what the problem is in case someone come across the similar issues. We add the data drive for each node /dev/sdb1 , but specify the wrong path in server.properties, which means the data was written into the wrong drive /dev/sda2, quickly eat up all the space in sda2, now we change the path. The sdb1 has 15Tb, which allows us to store data for a while and will be deleted in 1/2 weeks as config mentioned. But I am kinda curious about David's comments, ... after having tuned retention bytes or retention (time?) incorrectly. .. How do you guys set log.retention.bytes? I set log.retention.hours=336 (2 weeks), and should I set log.retention.bytes as default -1 or some other amount? thanks AL On Tue, Jan 6, 2015 at 12:43 PM, Sa Li sal...@gmail.com wrote: Thanks the reply, the disk is not full: root@exemplary-birds:~# df -h Filesystem Size Used Avail Use% Mounted on /dev/sda2 133G 3.4G 123G 3% / none4.0K 0 4.0K 0% /sys/fs/cgroup udev 32G 4.0K 32G 1% /dev tmpfs 6.3G 764K 6.3G 1% /run none5.0M 0 5.0M 0% /run/lock none 32G 0 32G 0% /run/shm none100M 0 100M 0% /run/user /dev/sdb114T 15G 14T 1% /srv Neither the memory root@exemplary-birds:~# free total used free sharedbuffers cached Mem: 659633729698380 56264992776 1706687863812 -/+ buffers/cache:1663900 64299472 Swap: 997372 0 997372 thanks On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com wrote: I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread.
Re: no space left error
Hi, All We fix the problem, I like to share the what the problem is in case someone come across the similar issues. We add the data drive for each node /dev/sdb1 , but specify the wrong path in server.properties, which means the data was written into the wrong drive /dev/sda2, quickly eat up all the space in sda2, now we change the path. The sdb1 has 15Tb, which allows us to store data for a while and will be deleted in 1/2 weeks as config mentioned. But I am kinda curious about David's comments, ... after having tuned retention bytes or retention (time?) incorrectly. .. How do you guys set log.retention.bytes? I set log.retention.hours=336 (2 weeks), and should I set log.retention.bytes as default -1 or some other amount? thanks AL On Tue, Jan 6, 2015 at 12:43 PM, Sa Li sal...@gmail.com wrote: Thanks the reply, the disk is not full: root@exemplary-birds:~# df -h Filesystem Size Used Avail Use% Mounted on /dev/sda2 133G 3.4G 123G 3% / none4.0K 0 4.0K 0% /sys/fs/cgroup udev 32G 4.0K 32G 1% /dev tmpfs 6.3G 764K 6.3G 1% /run none5.0M 0 5.0M 0% /run/lock none 32G 0 32G 0% /run/shm none100M 0 100M 0% /run/user /dev/sdb114T 15G 14T 1% /srv Neither the memory root@exemplary-birds:~# free total used free sharedbuffers cached Mem: 659633729698380 56264992776 1706687863812 -/+ buffers/cache:1663900 64299472 Swap: 997372 0 997372 thanks On Tue, Jan 6, 2015 at 12:10 PM, David Birdsong david.birds...@gmail.com wrote: I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed
Re: no space left error
the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at kafka.utils.Utils$.loadProps(Utils.scala:144) at kafka.Kafka$.main(Kafka.scala:34) at kafka.Kafka.main(Kafka.scala) On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote: Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li -- Alec Li
Topic in bad state after controller to broker messaging fails
Hi, I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker on CentOS 6.5. A percentage of my topic create attempts are randomly failing and leaving the new topic in a state in which it can not be used due to “partition doesn’t exist” errors as seen in server.log below. In controller.log, it looks like the controller fails to send either the become-leader LeaderAndIsr request” or the UpdateMetadata request” to the broker (which in fact is the same Kafka instance), due to a socket read failing (for unknown reason). My questions: (1) Is the bad topic state a result of the message not making it from the controller to the broker? (2) Any idea why the socket read might randomly fail? It can’t be a network issue since we’re running a single instance. (3) Shouldn’t the controller try to resend the message? controller.log [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController) [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.KafkaController) [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)] (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) (kafka.controller.PartitionStateMachine) [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine) [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails to send a request to broker id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:381) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch 2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0] - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) state-change.log [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger) [2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica to NewReplica (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger) [2015-01-06 21:31:10,501] TRACE Controller 0 epoch
RE: Is it possible to enforce an unique constraint through Kafka?
Log compaction though allows it to work as a data store quite well for some use cases . It's exactly why I started looking hard at Kafka lately. The general idea is quite simple. Rather than maintaining only recent log entries in the log and throwing away old log segments we maintain the most recent entry for each unique key. This ensures that the log contains a complete dataset and can be used for reloading key-based state. https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction Date: Tue, 6 Jan 2015 16:34:06 -0800 Subject: Re: Is it possible to enforce an unique constraint through Kafka? From: wiz...@gmail.com To: users@kafka.apache.org Kafka is more of a message queue than a data store. You can use it to store history of the queue (certainly a powerful use case for disaster recovery), but it's still not really a data store. From the Kafka website (kafka.apache.org): Apache Kafka is a publish-subscribe messaging [queue] rethought as a distributed commit log. -Mark On Tue, Jan 6, 2015 at 3:14 PM, Joseph Pachod joseph.pac...@gmail.com wrote: Hi Having read a lot about kafka and its use at linkedin, I'm still unsure whether Kafka can be used, with some mindset change for sure, as a general purpose data store. For example, would someone use Kafka to enforce an unique constraint? A simple use case is, in the case of linkedin, unicity of users' login. What would be you recommended implementation for such a need? Thanks in advance Best, Joseph
[Kafka-users] Differences MessageIn between two kafka clusters replicated by mirror maker
Hi, We are using mirrormaker to replicate data between two kafka clusters in difference datacenter, a fiber connected them, 1G bandwidth, ~35ms lantency. we found that the MessageIn in JMX beans with the broker in destination cluster greater than broker in source cluster. Our configuration like this: destination source cluster both deployed kafka 0.7.0 source clusters has 6 brokers with 32 partitions each,target clusters has 12 brokers with 32 partitions each and 6 mirrormakers doing replication,replicate only one topic. mirrormaker configuration like this: --num.streams 128 --num.producers 16 = consumer.properties: shallowiterator.enable=true socket.buffersize=31457280 fetch.size=1048576 = producer.properties: queue.enqueueTimeout.ms http://queue.enqueuetimeout.ms/=-1 # the number of messages batched at the producer batch.size=4000 #default 5000 connect.timeout.ms=1 #default 3 socket.timeout.ms=30 == test result: MessageIn: source cluster = 252,560,169, target cluster = 253,128,669 another problem, sometimes one or more broker in source clusters has not be consumered during the mirrormakers replicate other brokers well. Does anyone have an idea to resolve them.
Re: latency - how to reduce?
Have you tried using the built-in stress test scripts? bin/kafka-producer-perf-test.sh bin/kafka-consumer-perf-test.sh Here's how I stress tested them - nohup ${KAFKA_HOME}/bin/kafka-producer-perf-test.sh --broker-list ${KAFKA_SERVERS} --topic ${TOPIC_NAME} --new-producer --threads 16 --messages 1 1kafka-producer-perf-test.sh.log 21 nohup ${KAFKA_HOME}/bin/kafka-consumer-perf-test.sh --zookeeper ${ZOOKEEPER_QUORUM} --topic ${TOPIC_NAME} --threads 16 1kafka-consumer-perf-test.sh.log 21 And I used screen scrapping of the jmx ui screens to push metrics into TSDB to get the following.The rate below is per second - so I could push the Kafka cluster to 140k+ messages/sec on a 4-node cluster with very little utilization (30% utilization). From: Shlomi Hazan shl...@viber.com To: users@kafka.apache.org Sent: Tuesday, January 6, 2015 1:06 AM Subject: Re: latency - how to reduce? Will do. What did you have in mind? just write a big file to disk and measure the time it took to write? maybe also read back? using specific API's? Apart from the local Win machine case, are you aware of any issues with Amazon EC2 instances that may be causing that same latency in production? Thanks, Shlomi On Tue, Jan 6, 2015 at 4:04 AM, Jun Rao j...@confluent.io wrote: Not setting log.flush.interval.messages is good since the default gives the best latency. Could you do some basic I/O testing on the local FS in your windows machine to make sure the I/O latency is ok? Thanks, Jun On Thu, Jan 1, 2015 at 1:40 AM, Shlomi Hazan shl...@viber.com wrote: Happy new year! I did not set log.flush.interval.messages. I also could not find a default value in the docs. Could you explain about that? Thanks, Shlomi On Thu, Jan 1, 2015 at 2:20 AM, Jun Rao j...@confluent.io wrote: What's your setting of log.flush.interval.messages on the broker? Thanks, Jun On Mon, Dec 29, 2014 at 3:26 AM, Shlomi Hazan shl...@viber.com wrote: Hi, I am using 0.8.1.1, and I have hundreds of msec latency at best and even seconds at worst. I have this latency both on production, (with peak load of 30K msg/sec, replication = 2 across 5 brokers, acks = 1), and on the local windows machine using just one process for each of producer, zookeeper, kafka, consumer. Also tried batch.num.messages=1 and producer.type=sync on the local machine but saw no improvement. How can I push latency down to several millis, at least when running local? Thanks, Shlomi
New 0.8.2 client compatibility with 0.8.1.1 during failure cases
Hello, I have some of my own test cases very similar to the ones in ProducerFailureHandlingTest. I moved to the new producer against 0.8.1.1 and all of my test cases around disconnects fail. Moving to use 0.8.2-beta on server side things succeed. Here is an example test: - Healthy producer/server setup - Stop the server - Send a message - Call get on the future and it never returns. Doesn't matter if the server is started again or remains stopped So, I believe that while 0.8.2 producer is compatible with 0.8.1.1 in the happy case, but fails to return futures in cases where disconnects have occurred. Is it possible to run the test cases in ProducerFailureHandlingTest.scala against 0.8.1.1? Thanks, Paul
Re: messages lost
Try doing .get() on the future returned by the new producer. It should guarantee that message has made to kafka. Thanks, Mayuresh On Tue, Jan 6, 2015 at 4:21 PM, Sa Li sal...@gmail.com wrote: Hi, experts Again, we still having the issues of losing data, see we see data 5000 records, but only find 4500 records on brokers, we did set required.acks -1 to make sure all brokers ack, but that only cause the long latency, but not cure the data lost. thanks On Mon, Jan 5, 2015 at 9:55 AM, Xiaoyu Wang xw...@rocketfuel.com wrote: @Sa, the required.acks is producer side configuration. Set to -1 means requiring ack from all brokers. On Fri, Jan 2, 2015 at 1:51 PM, Sa Li sal...@gmail.com wrote: Thanks a lot, Tim, this is the config of brokers -- broker.id=1 port=9092 host.name=10.100.70.128 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 auto.leader.rebalance.enable=true auto.create.topics.enable=true default.replication.factor=3 log.dirs=/tmp/kafka-logs-1 num.partitions=8 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=10.100.70.128:2181,10.100.70.28:2181, 10.100.70.29:2181 zookeeper.connection.timeout.ms=100 --- We actually play around request.required.acks in producer config, -1 cause long latency, 1 is the parameter to cause messages lost. But I am not sure, if this is the reason to lose the records. thanks AL On Fri, Jan 2, 2015 at 9:59 AM, Timothy Chen tnac...@gmail.com wrote: What's your configured required.acks? And also are you waiting for all your messages to be acknowledged as well? The new producer returns futures back, but you still need to wait for the futures to complete. Tim On Fri, Jan 2, 2015 at 9:54 AM, Sa Li sal...@gmail.com wrote: Hi, all We are sending the message from a producer, we send 10 records, but we see only 99573 records for that topics, we confirm this by consume this topic and check the log size in kafka web console. Any ideas for the message lost, what is the reason to cause this? thanks -- Alec Li -- Alec Li -- Alec Li -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: mirrormaker tool in 0.82beta
Did you set offsets.storage to kafka in the consumer of mirror maker? Thanks, Jun On Mon, Jan 5, 2015 at 3:49 PM, svante karlsson s...@csi.se wrote: I'm using 0.82beta and I'm trying to push data with the mirrormaker tool from several remote sites to two datacenters. I'm testing this from a node containing zk, broker and mirrormaker and the data is pushed to a normal cluster. 3 zk and 4 brokers with replication. While the configuration seems straight forward things are a bit shaky. First, if something goes wrong on either consumer or producer side it stops transmitting data without dying which makes it harder to run under a supervisor (upstart in my case) I start pushing data to site 02 and mirrormaker pushes it to my datacenter - so far all good... But, when I run. (on my remote site) kafka-consumer-offset-checker.sh --group mirrormaker.from.site_02.to.site_ktv --zookeeper 192.168.0.106 Could not fetch offset for [saka.test.ext_datastream,7] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,4] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,2] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,5] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,6] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,3] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,1] due to kafka.common.NotCoordinatorForConsumerException. Could not fetch offset for [saka.test.ext_datastream,0] due to kafka.common.NotCoordinatorForConsumerException. Group Topic Pid Offset logSize Lag Owner mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 0 unknown 9310560 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 1 unknown 9313497 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 2 unknown 9323623 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 3 unknown 9334005 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 4 unknown 9324176 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 5 unknown 9336504 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 6 unknown 9316139 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 mirrormaker.from.site_02.to.site_ktv saka.test.ext_datastream 7 unknown 9318770 unknown mirrormaker.from.site_02.to.site_ktv_kafka-4-1420497015616-725cd1b5-0 Since I can push data between my sites something is working but what can be the case for NotCoordinatorForConsumerException on a single node cluster? Finally the __consumer_offsets topic should probably be masked out from mirroring without using whitelists or blacklists thanks /svante
Re: The message would be committed if message un-committed
The consumer always fetches data from the leader broker. Thanks, Jun On Tue, Jan 6, 2015 at 2:50 AM, chenlax lax...@hotmail.com wrote: as the kafka doc Only committed messages are ever given out to the consumer. . if followers does not copy the message on time and followers are in ISR, consumers would consume the message from leader broker? Thanks, Lax
Re: kafka deleted old logs but not released
Do you mean that the Kafka broker still holds a file handler on a deleted file? Do you see those files being deleted in the Kafka log4j log? Thanks, Jun On Tue, Jan 6, 2015 at 4:46 AM, Yonghui Zhao zhaoyong...@gmail.com wrote: Hi, We use kafka_2.10-0.8.1.1 in our server. Today we found disk space alert. We find many kafka data files are deleted, but still opened by kafka. such as: _yellowpageV2-0/68170670.log (deleted) java 8446 root 724u REG 253,2 536937911 26087362 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/68818668.log (deleted) java 8446 root 725u REG 253,2 536910838 26087364 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/69457098.log (deleted) java 8446 root 726u REG 253,2 536917902 26087368 /home/work/data/soft/kafka-0.8/data/_oakbay_v2_search_topic_ypgsearch_yellowpageV2-0/70104914.log (deleted) Is there anything wrong or wrong configed?
Re: [Kafka-users] Differences MessageIn between two kafka clusters replicated by mirror maker
Not sure exactly what's happening there. In any case, 0.7 is really old. You probably want to upgrade to the latest 0.8 release. Thanks, Jun On Tue, Jan 6, 2015 at 12:16 AM, tao li tust.05102...@gmail.com wrote: Hi, We are using mirrormaker to replicate data between two kafka clusters in difference datacenter, a fiber connected them, 1G bandwidth, ~35ms lantency. we found that the MessageIn in JMX beans with the broker in destination cluster greater than broker in source cluster. Our configuration like this: destination source cluster both deployed kafka 0.7.0 source clusters has 6 brokers with 32 partitions each,target clusters has 12 brokers with 32 partitions each and 6 mirrormakers doing replication,replicate only one topic. mirrormaker configuration like this: --num.streams 128 --num.producers 16 = consumer.properties: shallowiterator.enable=true socket.buffersize=31457280 fetch.size=1048576 = producer.properties: queue.enqueueTimeout.ms http://queue.enqueuetimeout.ms/=-1 # the number of messages batched at the producer batch.size=4000 #default 5000 connect.timeout.ms=1 #default 3 socket.timeout.ms=30 == test result: MessageIn: source cluster = 252,560,169, target cluster = 253,128,669 another problem, sometimes one or more broker in source clusters has not be consumered during the mirrormakers replicate other brokers well. Does anyone have an idea to resolve them.
Re: Zookeeper Connection When Using High Level Sample Consumer Code from Wiki
Ok, I could make the example work. The problem was that on the wiki page, there was no value for zk.connectiontimeout.ms.I inserted a value for that property (value = 10) and the example worked. Also, I realized my mail client messed up the earlier message. Here's the URL for the wiki page I was referring to - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example From: Jayesh Thakrar j_thak...@yahoo.com.INVALID To: users@kafka.apache.org users@kafka.apache.org Sent: Tuesday, January 6, 2015 11:09 AM Subject: Zookeeper Connection When Using High Level Sample Consumer Code from Wiki When I try running the Java Consumer example at https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleI get the following zookeeper connection error. I have verified zookeeper connectivity using a variety fo means (using Zookeeper built-in client, sending 4-letter commands to the zk port, etc.) - so I know the zookeeper info is correct. Here's the error I get. Any pointers will be greatly appreciated. Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 400 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:880) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:170) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:126) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:66) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:69) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) at kafka_sample.Consumer.init(Consumer.java:21) at kafka_sample.Consumer.main(Consumer.java:71)
Re: The message would be consumed if message un-committed
Only Committed messages are made available to the consumer. The consumer will always read from the leader. The message is said to be committed (The high water mark is advanced) only when all the replicas in the ISR have received it. Thanks, Mayuresh On Tue, Jan 6, 2015 at 3:41 AM, chenlax lax...@hotmail.com wrote: as the kafka doc Only committed messages are ever given out to the consumer. . if followers does not copy the message on time and followers are in ISR, consumers would consume the message from leader broker? Thanks, Lax -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: Release Date for Kafka 0.8.2
You can track the blockers here https://issues.apache.org/jira/browse/KAFKA-1841?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Blocker%20ORDER%20BY%20key%20DESC. We are waiting on follow up patches for 2 JIRAs which are under review. We should be able to release 0.8.2 in a few days. On Mon, Jan 5, 2015 at 10:26 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi Srividhya, See http://search-hadoop.com/m/4TaT4B9tys1/subj=Re+Kafka+0+8+2+release+before+Santa+Claus Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jan 5, 2015 at 11:55 AM, Srividhya Shanmugam srividhyashanmu...@fico.com wrote: Kafka Team, We are currently using the 0.8.2 beta version with a patch for KAFKA-1738. Do you have any updates on when 0.8.2 final version will be released? Thanks, Srividhya This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately. -- Thanks, Neha
Re: New 0.8.2 client compatibility with 0.8.1.1 during failure cases
Paul, That behavior is currently expected, see https://issues.apache.org/jira/browse/KAFKA-1788. There are currently no client-side timeouts in the new producer, so the message just sits there forever waiting for the server to come back so it can try to send it. If you already have tests for a variety of failure modes it might be helpful to contribute them to the WIP patch in that JIRA. Currently the patch only adds one unit test for RecordAccumulator, a few tests using the full server would be an improvement. -Ewen On Tue, Jan 6, 2015 at 7:54 AM, Paul Pearcy ppea...@gmail.com wrote: Hello, I have some of my own test cases very similar to the ones in ProducerFailureHandlingTest. I moved to the new producer against 0.8.1.1 and all of my test cases around disconnects fail. Moving to use 0.8.2-beta on server side things succeed. Here is an example test: - Healthy producer/server setup - Stop the server - Send a message - Call get on the future and it never returns. Doesn't matter if the server is started again or remains stopped So, I believe that while 0.8.2 producer is compatible with 0.8.1.1 in the happy case, but fails to return futures in cases where disconnects have occurred. Is it possible to run the test cases in ProducerFailureHandlingTest.scala against 0.8.1.1? Thanks, Paul -- Thanks, Ewen