CSharp librari and Producer Closing socket for because of error (kafka.network.Processor),java.nio.BufferUnderflowException
Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = kafka 1.; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = kafka 2.; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer(broker, 9092); producer.Send(kafkademo3, 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? And one more question. In Csharp lib how can I give to producer brokers list to get fault tolerance in case one broker is down? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
Re: Kafka producer in CSharp
Thank you for response. I think HTTP is ok. I have two more question in case of HTTP. 1. Can I have fault tolerance in case I have two or more brokers? 2. Can I ack that message is in queue? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 12/05/14 23:28, Joe Stein wrote: The wire protocol has changed drastically since then. I don't know of any C# clients (there are none on the client library page nor have I heard of any being used in production but maybe there are some). For clients that use DotNet I often suggest that they use some HTTP producer/consumer https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote: Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/ csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = kafka 1.; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = kafka 2.; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer(broker, 9092); producer.Send(kafkademo3, 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.init(RequestChannel. scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom( BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
Re: Kafka producer in CSharp
Ok got some info myself. I can have fault tolerance - I can start kafka-http-endpoint using broker lists I can have ack - start using --sync But what is best practice in case if kafka-http-endpoint goes down? Start multiple kafka-http-endpoint's and in client side just control that kafka-http-endpoint is up? And if not up then using another? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 13/05/14 10:49, Margusja wrote: Thank you for response. I think HTTP is ok. I have two more question in case of HTTP. 1. Can I have fault tolerance in case I have two or more brokers? 2. Can I ack that message is in queue? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 12/05/14 23:28, Joe Stein wrote: The wire protocol has changed drastically since then. I don't know of any C# clients (there are none on the client library page nor have I heard of any being used in production but maybe there are some). For clients that use DotNet I often suggest that they use some HTTP producer/consumer https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote: Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/ csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = kafka 1.; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = kafka 2.; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer(broker, 9092); producer.Send(kafkademo3, 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.init(RequestChannel. scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom( BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
TCP_TOO_MANY_ESTAB_CONNS
Dear all, We started using kafka as a queue server recently. Our production setup has 3 kafka servers and 3 zookeepers. We having 30 topics with 10 partitions each. We have producer pool(10 producers / APP_SERVERS). So totally 30 producers. With our monitoring tool, we could detect TCP_TOO_MANY_ESTAB_CONNS warning message frequently. Why is this warning comes so frequently ?? Are we are missing something ??? PS : Our monitoring tool's threshold is 600 connections thanks in advance Ranjith Venkatesan
What happens to Kafka when ZK lost its quorum?
Hi all, Can Kafka producers, brokers and consumers still be processing messages and functioning in their normal states if Zookeeper lost its quorum? Thanks, Connie
Re: Kafka producer in CSharp
The wire protocol has changed drastically since then. I don't know of any C# clients (there are none on the client library page nor have I heard of any being used in production but maybe there are some). For clients that use DotNet I often suggest that they use some HTTP producer/consumer https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote: Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/ csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = kafka 1.; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = kafka 2.; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer(broker, 9092); producer.Send(kafkademo3, 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.init(RequestChannel. scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom( BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)
Re: What happens to Kafka when ZK lost its quorum or becomes unstable?
The brokers are designed with the expectation that ZK service is always available. If ZK is not available, the clients may not be able to send/fetch data properly. Thanks, Jun On Tue, May 13, 2014 at 6:48 AM, Connie Yang cybercon...@gmail.com wrote: Hi, Can the producers, brokers and consumers still be processing messages when their ZK cluster lost its quorum or becomes unstable? I know this is rather general question as it may depends on what configuration these use. So, please enumerate all of those combinations. Thanks, Connie
Re: Loss of Leader in Kafka
3. Deleted the topic. Checked only the zookeeper to see if the /brokers/topics DOES NOT have the topic You are seeing this problem since delete topic is not supported in Kafka. Any attempt to delete a topic may leave your cluster in an unstable state. We plan to release 0.8.2 with delete topic support or you could try the latest trunk. On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar kashya...@gmail.comwrote: Hi, I am hitting a strange exception while creating a topic in Kafka - Steps to generate this- 1. Created a topic multipartition_test with 2 partitions and 2 replicas 2. Added some data to this topics and verified data is coming up for both partitions 3. Deleted the topic. Checked only the zookeeper to see if the /brokers/topics DOES NOT have the topic 4. Recreated the topic in exactly the same way as in point 1. After this, when I list topics using ./kafka-list-topic.sh, i see that *leader:* none and *isr:* for this topic. State change logs give the following exception. kafka.common.StateChangeFailedException: encountered error while electing leader for partition [multipartition_test,1] due to: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state. at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312) at kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state at kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291) ... 11 more Can you please help what am I doing wrong? Regards, kashyap
What happens to Kafka when ZK lost its quorum or becomes unstable?
Hi, Can the producers, brokers and consumers still be processing messages when their ZK cluster lost its quorum or becomes unstable? I know this is rather general question as it may depends on what configuration these use. So, please enumerate all of those combinations. Thanks, Connie
100 MB messages
Hello everyone, Can Kafka be used for binary large objects of 100 MB ? Or should I use a different solution to store these files like MongoDB and maybe send the location of these files in MongoDB over Kafka? Thanks is advance, Wouter
Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled
In 0.8, ZK is critical for managing the brokers. So, we do expect that ZK service to be always available. Thanks, Jun On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote: Hi all, I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector. Steps to reproduce: 1. Start single-broker Kafka cluster with auto.create.topic.enable set to true 2. Start ConsumerConnector on topic (which does not yet exist) with auto.offset.reset set to smallest. 3. Produce some data to the topic. 4. Bring Zookeeper down 5. Call ConsumerConnector.close() Observation - the call blocks forever with the following stack trace: java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0xc6bf0570 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072) at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.Pool$$anon$1.foreach(Unknown Source) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at kafka.utils.Pool.foreach(Unknown Source) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) - locked 0xc6be0c60 (a java.lang.Object) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) ... Once I set auto.commit.enable to false, the problem is gone. This is identical to what's described in https://issues.apache.org/jira/browse/KAFKA-601, with the only difference that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this issue other than disabling auto-commit? Thanks, Yury
Re: TCP_TOO_MANY_ESTAB_CONNS
Hi Ranjith, Which version of Kafka are you using? Guozhang On Tue, May 13, 2014 at 3:53 AM, Ranjith Venkatesan ranjit...@zohocorp.comwrote: Dear all, We started using kafka as a queue server recently. Our production setup has 3 kafka servers and 3 zookeepers. We having 30 topics with 10 partitions each. We have producer pool(10 producers / APP_SERVERS). So totally 30 producers. With our monitoring tool, we could detect *TCP_TOO_MANY_ESTAB_CONNS* warning message frequently. Why is this warning comes so frequently ?? Are we are missing something ??? PS : Our monitoring tool's threshold is 600 connections thanks in advance Ranjith Venkatesan -- -- Guozhang
Re: 100 MB messages
Or HDFS and use Kafka for the event of file, yup. Processing on the files can be done without the mapreduce overhead in Hadoop now using Apache Tez (or something that use Tez like Pig). /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On May 13, 2014, at 4:40 AM, Wouter Bosland wbosland.pr...@gmail.com wrote: Hello everyone, Can Kafka be used for binary large objects of 100 MB ? Or should I use a different solution to store these files like MongoDB and maybe send the location of these files in MongoDB over Kafka? Thanks is advance, Wouter
Killing last replica for partition doesn't change ISR/Leadership if replica is running controller
Hi, Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a topic with replication 2 and one of it’s partitions - partition 1 is placed on brokers A and B. If the broker A is already down than for the partition 1 we have: Leader: B, ISR: [B]. If the current controller is node C, than killing broker B will turn partition 1 into state: Leader: -1, ISR: []. But if the current controller is node B, than killing it won’t update leadership/isr for partition 1 even when controller will be restarted on node C, so partition 1 will forever think it’s leader is node B which is dead. It looks that KafkaController.onBrokerFailure handles situation when the broker down is the partition leader - it sets the new leader value to -1. To the contrary, KafkaController.onControllerFailover never removes leader from the partition with all replicas offline - allegedly because partition gets into ReplicaDeletionIneligible state. Is it intended behavior? This behavior affects DefaultEventHandler.getPartition in the null key case - it can’t determine partition 1 as having no leader, and this results into events send failure. What we are trying to achieve - is to be able to write data even if some partitions lost all replicas, which is rare yet still possible scenario. Using null key looked suitable with minor DefaultEventHandler modifications (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid caching and uneven events distribution) as we neither use logs compaction nor rely on partitioning of the data. We had such behavior with kafka 0.7 - if the node is down, simply produce to a different one. Thanks, Alex
Re: Loss of Leader in Kafka
Hi Kashyap, This may be a real issue with delete-topic feature, could you also reproduce this with trunk HEAD and 1 partition/1 replica? Guozhang On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar kashya...@gmail.comwrote: Hi, I am hitting a strange exception while creating a topic in Kafka - Steps to generate this- 1. Created a topic multipartition_test with 2 partitions and 2 replicas 2. Added some data to this topics and verified data is coming up for both partitions 3. Deleted the topic. Checked only the zookeeper to see if the /brokers/topics DOES NOT have the topic 4. Recreated the topic in exactly the same way as in point 1. After this, when I list topics using ./kafka-list-topic.sh, i see that *leader:* none and *isr:* for this topic. State change logs give the following exception. kafka.common.StateChangeFailedException: encountered error while electing leader for partition [multipartition_test,1] due to: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state. at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312) at kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state at kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291) ... 11 more Can you please help what am I doing wrong? Regards, kashyap -- -- Guozhang
Re: Kafka 0.7.X maven repo
Maven wasn't introduced until 0.8.0. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, May 13, 2014 at 9:52 AM, Vijay Ramachandran vramachand...@apple.com wrote: We are currently using kafka 0.8.X but want to try something with 0.7.x and compare. Can anyone point me to the kafka 0.7.X maven repo ? Thanks in advance Vijay
zookeeper down
Can kafka survive when zookeeper is down and not connectable ? Will the consumer or producer still work in that case ? Weide
0.7 maven repo
We are currently using kafka 0.8.X but want to try something with 0.7.x and compare. Can anyone point me to the kafka 0.7.X maven repo ? Thanks in advance Vijay
Re: 100 MB messages
It can, but it will not perform very well. Kafka fully instantiates messages in memory (as a byte[] basically) so if you send a 100MB message the server will do a 100MB allocation to hold that data prior to writing to disk. I think MongoDB does have blob support so passing a pointer via Kafka as you describe may be a better solution. -Jay On Tue, May 13, 2014 at 1:40 AM, Wouter Bosland wbosland.pr...@gmail.comwrote: Hello everyone, Can Kafka be used for binary large objects of 100 MB ? Or should I use a different solution to store these files like MongoDB and maybe send the location of these files in MongoDB over Kafka? Thanks is advance, Wouter
kafka performance question
Dear all, We want to use kafka to collect and dispatch data file, but the performance is maybe lower than we want. In our cluster,there is a provider and a broker. We use a one thread read file from local disk of provider and send it to broker. The average throughput is only 3 MB/S~4MB/S. But if we just use java NIO API to send file ,the throughput can exceed 200MB/S. Why the kafka performance is so bad in our test, are we missing something?? Our server: Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4 Mem:300G Disk:600G 15K RPM SAS*8 Configuration of provider: props.put(serializer.class, kafka.serializer.NullEncoder); props.put(metadata.broker.list, 169.10.35.57:9092); props.put(request.required.acks, 0); props.put(producer.type, async);//异步 props.put(queue.buffering.max.ms,500); props.put(queue.buffering.max.messages,10); props.put(batch.num.messages, 1200); props.put(queue.enqueue.timeout.ms, -1); props.put(send.buffer.bytes, 10240); Configuration of broker: # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the License); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an AS IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults # Server Basics # # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Socket Server Settings # # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for host.name if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=hostname routable by clients # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=port accessible by clients # The number of threads handling network requests #num.network.threads=2 # The number of threads doing disk I/O #num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server #socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server #socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) #socket.request.max.bytes=104857600 # Log Basics # # A comma seperated list of directories under which to store log files log.dirs=/data/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #num.partitions=2 # Log Flush Policy # # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: #1. Durability: Unflushed data may be lost if you are not using replication. #2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. #3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=1 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 # Log Retention Policy # # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a
Re: Loss of Leader in Kafka
After topic deletion, I deleted the physical folders in data logs and then restarted Kafka. That resolved. Thanks! Will try out 0.8.1 and let the group know. Regards, Kashyap On Monday, May 12, 2014, Jun Rao jun...@gmail.com wrote: Delete topic doesn't quite work in 0.8.1. We recently fixed it in trunk. Could you give it a try and see if you see the same issue? Thanks, Jun On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar kashya...@gmail.comjavascript:; wrote: Hi, I am hitting a strange exception while creating a topic in Kafka - Steps to generate this- 1. Created a topic multipartition_test with 2 partitions and 2 replicas 2. Added some data to this topics and verified data is coming up for both partitions 3. Deleted the topic. Checked only the zookeeper to see if the /brokers/topics DOES NOT have the topic 4. Recreated the topic in exactly the same way as in point 1. After this, when I list topics using ./kafka-list-topic.sh, i see that *leader:* none and *isr:* for this topic. State change logs give the following exception. kafka.common.StateChangeFailedException: encountered error while electing leader for partition [multipartition_test,1] due to: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state. at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312) at kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state at kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291) ... 11 more Can you please help what am I doing wrong? Regards, kashyap
Re: keyed-messages de-duplication
Hi, The compaction is done to clean-up space. It isn't done immediately only periodically. I suspect the reason you see no compaction is that we never compact the active segment of the log (the most recent file) as that is still being written to. The compaction would not happen until a new segment file was rolled. If you want to see this happen I recommend changing the file segment size configuration to something small (5mb) and produce enough messages to roll a new segment file. You should then see logging about compaction in logs/log-cleaner.log. -Jay On Tue, May 13, 2014 at 11:52 AM, C 4.5 cfourf...@gmail.com wrote: I understand Kafka supports keyed messages (I am using 0.8.1.1) and it is possible to de-duplicate messages based on the message key. (The log compaction section of the on-line documentation described how that works.) I am using a code example that come with Kafka (namely KafkaConsumerProducerDemo) and run it through Kafka local mode. I write a set of messages with the same String key and then have a consumer that consumes data. The consumer consumes messages *only* after the producer has produced all its messages. I would expect the consumer to retrieve only the latest message (as all messages have the same key) but it retrieves all messages the producer has emitted. I have also turned on these properties in the Kafka server: log.cleaner.enable=true log.cleanup.policy=dedupe - is de-duplication of messages guaranteed to take effect only after compaction? - I have tried to force compaction by setting log.cleaner.backoff.ms and log.cleaner.min.cleanabke.ratio to very low values, but I still observe the same behavior. Any ideas or pointers? Thanks.
Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled
Hi Yury, When auto-commit is turned on, before the consumer shuts down completely it needs to commit its last offset to ZK to avoid any duplicates consumed when it starts over. If the ZK is not up running then the shutdown would rather stuck than let it go and incur duplicates. This behavior is by design that to make consistent offset maintenance, ZK should be up all the time. Guozhang On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote: Hi all, I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector. Steps to reproduce: 1. Start single-broker Kafka cluster with auto.create.topic.enable set to true 2. Start ConsumerConnector on topic (which does not yet exist) with auto.offset.reset set to smallest. 3. Produce some data to the topic. 4. Bring Zookeeper down 5. Call ConsumerConnector.close() Observation - the call blocks forever with the following stack trace: java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0xc6bf0570 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072) at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.Pool$$anon$1.foreach(Unknown Source) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at kafka.utils.Pool.foreach(Unknown Source) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) - locked 0xc6be0c60 (a java.lang.Object) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) ... Once I set auto.commit.enable to false, the problem is gone. This is identical to what's described in https://issues.apache.org/jira/browse/KAFKA-601, with the only difference that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this issue other than disabling auto-commit? Thanks, Yury -- -- Guozhang
Re: Killing last replica for partition doesn't change ISR/Leadership if replica is running controller
Yes, that seems like a real issue. Could you file a jira? Thanks, Jun On Tue, May 13, 2014 at 11:58 AM, Alex Demidko alexan...@metamarkets.comwrote: Hi, Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a topic with replication 2 and one of it’s partitions - partition 1 is placed on brokers A and B. If the broker A is already down than for the partition 1 we have: Leader: B, ISR: [B]. If the current controller is node C, than killing broker B will turn partition 1 into state: Leader: -1, ISR: []. But if the current controller is node B, than killing it won’t update leadership/isr for partition 1 even when controller will be restarted on node C, so partition 1 will forever think it’s leader is node B which is dead. It looks that KafkaController.onBrokerFailure handles situation when the broker down is the partition leader - it sets the new leader value to -1. To the contrary, KafkaController.onControllerFailover never removes leader from the partition with all replicas offline - allegedly because partition gets into ReplicaDeletionIneligible state. Is it intended behavior? This behavior affects DefaultEventHandler.getPartition in the null key case - it can’t determine partition 1 as having no leader, and this results into events send failure. What we are trying to achieve - is to be able to write data even if some partitions lost all replicas, which is rare yet still possible scenario. Using null key looked suitable with minor DefaultEventHandler modifications (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid caching and uneven events distribution) as we neither use logs compaction nor rely on partitioning of the data. We had such behavior with kafka 0.7 - if the node is down, simply produce to a different one. Thanks, Alex
Kafka 0.7.X maven repo
We are currently using kafka 0.8.X but want to try something with 0.7.x and compare. Can anyone point me to the kafka 0.7.X maven repo ? Thanks in advance Vijay