Re: Having trouble with the simplest remote kafka config
Sure no problem. I actually did run the ‘localhost’ command to generate output for the e-mail, but that was a typo: I always use the right IP. But I ran it again, this time with the IP of the VM. I ran the command from my Mac (the client). Thanks for taking a look! Here it is: Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test [2015-02-18 09:45:49,148] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and boy are my arms tired [2015-02-18 09:45:58,533] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2015-02-18 09:45:58,649] WARN Failed to send producer request with correlation id 5 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at
Consuming a snapshot from log compacted topic
We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the cleaner-offset-checkpoint you would be able to consume up to this point, check the point was still the same, and compaction had not yet occurred, and therefore determine you had receive everything at least once. (Assuming there was no race condition between compaction and writing to the file) Has anybody got any thoughts? Will
Re: Having trouble with the simplest remote kafka config
Yes, the topic I am producing to exists. I can produce to it fine when running the kafa-console-producer.sh tool from the Kafka node (the VM) itself (in which case I can either use localhost or the public-facing IP): Here is where I produce messages (running on the Kafka node): == rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test [2015-02-18 09:29:19,373] WARN Property topic is not valid (kafka.utils.VerifiableProperties) echo [2015-02-18 09:29:21,786] INFO Closing socket connection to /192.168.241.128. (kafka.network.Processor) bobs your uncle other bobs your uncle ^Z [3]+ Stopped bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test Here is where I consume them (also on the Kafka node): == rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-consumer.sh --zookeeper 192.168.241.128:2181 --topic test --from-beginning [2015-02-18 09:30:52,566] INFO Accepted socket connection from /192.168.241.128:40532 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:30:52,577] INFO Client attempting to establish new session at /192.168.241.128:40532 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:52,579] INFO Established session 0x14b9afea96f0003 with negotiated timeout 3 for client /192.168.241.128:40532 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,050] INFO Accepted socket connection from /192.168.241.128:40533 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:30:53,060] INFO Client attempting to establish new session at /192.168.241.128:40533 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,062] INFO Established session 0x14b9afea96f0004 with negotiated timeout 6000 for client /192.168.241.128:40533 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,237] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x2 zxid:0x42 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/ids Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,246] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x3 zxid:0x43 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321 (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,903] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x19 zxid:0x47 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/owners/test Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/owners/test (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,905] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x1a zxid:0x48 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/owners (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:54,101] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) echo bobs your uncle other bobs your uncle The ‘test’ topic does exist: == rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-topics.sh --list --zookeeper localhost:2181 [2015-02-18 09:32:56,991] INFO Accepted socket connection from /127.0.0.1:42143 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:32:56,996] INFO Client attempting to establish new session at /127.0.0.1:42143 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:32:56,997] INFO Established session 0x14b9afea96f0006 with negotiated timeout 3 for client /127.0.0.1:42143 (org.apache.zookeeper.server.ZooKeeperServer) test [2015-02-18 09:32:57,053] INFO Processed session termination for sessionid: 0x14b9afea96f0006 (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:32:57,054] INFO Closed socket connection for client /127.0.0.1:42143 which had sessionid 0x14b9afea96f0006 (org.apache.zookeeper.server.NIOServerCnxn) The version of Kafka I am using is kafka_2.10-0.8.2.0. I am hoping that this is such a simple configuration that debugging would be overkill, but rather there is some simple setting used by the Quick Start example that I missed that precludes a producer posting to a remote queue :) On Feb 17, 2015, at 10:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Time to debug Kafka then :) Does the topic you are producing to exists? (you can check with kafka-topics tool) If not, do you have auto-creation enabled? Which version are you on? Is it possible you ran into KAFKA-1738? On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane
Re: Having trouble with the simplest remote kafka config
I think your log did show that your are connecting to localhost:9092: [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException Can you check again? -Jiangjie (Becket) Qin On 2/17/15, 10:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Time to debug Kafka then :) Does the topic you are producing to exists? (you can check with kafka-topics tool) If not, do you have auto-creation enabled? Which version are you on? Is it possible you ran into KAFKA-1738? On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane r...@defend7.com wrote: Telnet seems to be able to connect from the Mac to the VM and from the VM to the VM: From Mac to VM: Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128 9092 Trying 192.168.241.128... Connected to 192.168.241.128. Escape character is '^]¹. From VM to VM: rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092 Trying ::1... Connected to localhost. Escape character is '^]¹. From VM to Mac: rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092 Trying 192.168.1.27... telnet: Unable to connect to remote host: Connection refused From Mac to Mac: Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... telnet: connect to address 127.0.0.1: Connection refused telnet: Unable to connect to remote host On Feb 17, 2015, at 10:03 PM, Gwen Shapira gshap...@cloudera.com wrote: What happens when you telnet to port 9092? try it from both your mac and the ubuntu vm. On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane r...@defend7.com wrote: I checked iptables and all rules are set to forward, so nothing should be blocked in the VM example. In the container example the port is explicitly EXPOSEd and other ports in a similar range (e.g., 8080) can be accessed just fine. On Feb 17, 2015, at 8:56 PM, Gwen Shapira gshap...@cloudera.com wrote: Is it possible that you have iptables on the Ubuntu where you run your broker? Try disabling iptables and see if it fixes the issue. On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane r...@defend7.com wrote: So I would like to have two machines: one running zookeeper and a single kafka node and another machine running a producer. I want to use the basic commands mentioned in the Quick Start guide to do this. However, I keep getting connection closed exceptions in the producer. This is what I do: On the kafka/zookeeper machine: bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Šso far so good, now on the producer machine: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-02-17 20:43:28,466] WARN Property topic is not valid (kafka.utils.VerifiableProperties) echo press enter to send Œecho¹ above Šnow it starts spewing the errors in the Producer Errors Appendix. What I don¹t understand is why? I checked the default configurations and it is binding to all interfaces as the bind to localhost is commented out. I checked netstat and 9092 is open on the zookeeper/kafka machine. I have tried this with an Ubuntu VM and a container where the container hosts the zookeeper/kafka server and I have tried this with my native machine (OS X) and an Ubuntu VM where the VM is the zookeeper/kafka server. In both cases the same thing happens. I am just trying to get the simplest possible configuration where the producer is not on the same machine as the kafka queue up and running. How can I make this work? Thanks for any help. Producer Erros Appendix: [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu cer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 2) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D efaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at
Re: Having trouble with the simplest remote kafka config
I also tried running the producer from the Mac client again, but this time with TRACE and DEBUG options un-commented from the log4j.properties file on the VM server. It seems that the connection is established (on port 50045) and bytes are being read from the client (192.168.241.1). Then subsequent connections are made (on ports 50046, 50047, 50048, and 50049). I am guessing these are retry attempts made by the producer. So it looks like the connection is made, and then Kafka decides to close it for some reason, and the client continues to retry. Here are the debug-on server-side logs: = [2015-02-18 09:59:53,819] TRACE Processor id 1 selection time = 300542531 ns (kafka.network.Processor) [2015-02-18 09:59:53,952] TRACE Processor id 2 selection time = 301409787 ns (kafka.network.Processor) [2015-02-18 09:59:54,019] TRACE Processor id 0 selection time = 30063 ns (kafka.network.Processor) [2015-02-18 09:59:54,077] TRACE Processor id 0 selection time = 57586199 ns (kafka.network.Processor) [2015-02-18 09:59:54,077] DEBUG Processor 0 listening to new connection from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,084] TRACE Processor id 0 selection time = 6156172 ns (kafka.network.Processor) [2015-02-18 09:59:54,084] TRACE 36 bytes read from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 1154956 ns (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Socket server received response to send, registering for write: Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@420433c6,null,1424282394084,/192.168.241.1:50045),kafka.network.BoundedByteBufferSend@21e32e06,SendAction) (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 28607 ns (kafka.network.Processor) [2015-02-18 09:59:54,086] TRACE 70 bytes written to /192.168.241.1:50045 using key sun.nio.ch.SelectionKeyImpl@420433c6 (kafka.network.Processor) [2015-02-18 09:59:54,086] TRACE Finished writing, registering for read on connection /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,097] TRACE Processor id 0 selection time = 11043038 ns (kafka.network.Processor) [2015-02-18 09:59:54,098] INFO Closing socket connection to /192.168.241.1. (kafka.network.Processor) [2015-02-18 09:59:54,098] DEBUG Closing connection from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,121] TRACE Processor id 1 selection time = 301719474 ns (kafka.network.Processor) [2015-02-18 09:59:54,253] TRACE Processor id 2 selection time = 300837240 ns (kafka.network.Processor) [2015-02-18 09:59:54,259] TRACE Processor id 1 selection time = 137306479 ns (kafka.network.Processor) [2015-02-18 09:59:54,259] DEBUG Processor 1 listening to new connection from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,260] TRACE Processor id 1 selection time = 42838 ns (kafka.network.Processor) [2015-02-18 09:59:54,260] TRACE 36 bytes read from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,262] TRACE Socket server received response to send, registering for write: Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@1c630e29,null,1424282394260,/192.168.241.1:50046),kafka.network.BoundedByteBufferSend@2b36b44e,SendAction) (kafka.network.Processor) [2015-02-18 09:59:54,262] TRACE Processor id 1 selection time = 48788 ns (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE 70 bytes written to /192.168.241.1:50046 using key sun.nio.ch.SelectionKeyImpl@1c630e29 (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE Finished writing, registering for read on connection /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE Processor id 1 selection time = 40185 ns (kafka.network.Processor) [2015-02-18 09:59:54,264] INFO Closing socket connection to /192.168.241.1. (kafka.network.Processor) [2015-02-18 09:59:54,264] DEBUG Closing connection from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,369] TRACE Processor id 2 selection time = 115233690 ns (kafka.network.Processor) [2015-02-18 09:59:54,369] DEBUG Processor 2 listening to new connection from /192.168.241.1:50047 (kafka.network.Processor) [2015-02-18 09:59:54,370] TRACE Processor id 2 selection time = 43183 ns (kafka.network.Processor) [2015-02-18 09:59:54,370] TRACE 36 bytes read from /192.168.241.1:50047 (kafka.network.Processor) [2015-02-18 09:59:54,372] TRACE Socket server received response to send, registering for write: Response(2,Request(2,sun.nio.ch.SelectionKeyImpl@26ec47e9,null,1424282394370,/192.168.241.1:50047),kafka.network.BoundedByteBufferSend@626525f5,SendAction) (kafka.network.Processor) [2015-02-18 09:59:54,372] TRACE Processor id 2 selection time = 50442 ns (kafka.network.Processor) [2015-02-18 09:59:54,372] TRACE 70 bytes written to /192.168.241.1:50047 using key sun.nio.ch.SelectionKeyImpl@26ec47e9 (kafka.network.Processor)
Re: Having trouble with the simplest remote kafka config
OK. Steve Miller helped me solve the problem. I needed to explicitly set advertised.host.name to advertised.host.name=192.168.241.128. The logs showed the producer could connect to 9092 but when it was told which hosts to connect to to queue messages it got unresolvable hosts. By setting this explicitly to 192.168.241.128 I forced the hosts returned to be the resolvable IP address of my VM. On Feb 18, 2015, at 10:07 AM, Richard Spillane r...@defend7.com wrote: I also tried running the producer from the Mac client again, but this time with TRACE and DEBUG options un-commented from the log4j.properties file on the VM server. It seems that the connection is established (on port 50045) and bytes are being read from the client (192.168.241.1). Then subsequent connections are made (on ports 50046, 50047, 50048, and 50049). I am guessing these are retry attempts made by the producer. So it looks like the connection is made, and then Kafka decides to close it for some reason, and the client continues to retry. Here are the debug-on server-side logs: = [2015-02-18 09:59:53,819] TRACE Processor id 1 selection time = 300542531 ns (kafka.network.Processor) [2015-02-18 09:59:53,952] TRACE Processor id 2 selection time = 301409787 ns (kafka.network.Processor) [2015-02-18 09:59:54,019] TRACE Processor id 0 selection time = 30063 ns (kafka.network.Processor) [2015-02-18 09:59:54,077] TRACE Processor id 0 selection time = 57586199 ns (kafka.network.Processor) [2015-02-18 09:59:54,077] DEBUG Processor 0 listening to new connection from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,084] TRACE Processor id 0 selection time = 6156172 ns (kafka.network.Processor) [2015-02-18 09:59:54,084] TRACE 36 bytes read from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 1154956 ns (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Socket server received response to send, registering for write: Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@420433c6,null,1424282394084,/192.168.241.1:50045),kafka.network.BoundedByteBufferSend@21e32e06,SendAction) (kafka.network.Processor) [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 28607 ns (kafka.network.Processor) [2015-02-18 09:59:54,086] TRACE 70 bytes written to /192.168.241.1:50045 using key sun.nio.ch.SelectionKeyImpl@420433c6 (kafka.network.Processor) [2015-02-18 09:59:54,086] TRACE Finished writing, registering for read on connection /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,097] TRACE Processor id 0 selection time = 11043038 ns (kafka.network.Processor) [2015-02-18 09:59:54,098] INFO Closing socket connection to /192.168.241.1. (kafka.network.Processor) [2015-02-18 09:59:54,098] DEBUG Closing connection from /192.168.241.1:50045 (kafka.network.Processor) [2015-02-18 09:59:54,121] TRACE Processor id 1 selection time = 301719474 ns (kafka.network.Processor) [2015-02-18 09:59:54,253] TRACE Processor id 2 selection time = 300837240 ns (kafka.network.Processor) [2015-02-18 09:59:54,259] TRACE Processor id 1 selection time = 137306479 ns (kafka.network.Processor) [2015-02-18 09:59:54,259] DEBUG Processor 1 listening to new connection from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,260] TRACE Processor id 1 selection time = 42838 ns (kafka.network.Processor) [2015-02-18 09:59:54,260] TRACE 36 bytes read from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,262] TRACE Socket server received response to send, registering for write: Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@1c630e29,null,1424282394260,/192.168.241.1:50046),kafka.network.BoundedByteBufferSend@2b36b44e,SendAction) (kafka.network.Processor) [2015-02-18 09:59:54,262] TRACE Processor id 1 selection time = 48788 ns (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE 70 bytes written to /192.168.241.1:50046 using key sun.nio.ch.SelectionKeyImpl@1c630e29 (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE Finished writing, registering for read on connection /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,263] TRACE Processor id 1 selection time = 40185 ns (kafka.network.Processor) [2015-02-18 09:59:54,264] INFO Closing socket connection to /192.168.241.1. (kafka.network.Processor) [2015-02-18 09:59:54,264] DEBUG Closing connection from /192.168.241.1:50046 (kafka.network.Processor) [2015-02-18 09:59:54,369] TRACE Processor id 2 selection time = 115233690 ns (kafka.network.Processor) [2015-02-18 09:59:54,369] DEBUG Processor 2 listening to new connection from /192.168.241.1:50047 (kafka.network.Processor) [2015-02-18 09:59:54,370] TRACE Processor id 2 selection time = 43183 ns (kafka.network.Processor) [2015-02-18 09:59:54,370] TRACE 36 bytes read from /192.168.241.1:50047
Re: Resetting Offsets
Reading offsets looks like it's compatible across 0.8.1 and 0.8.2. However, we cannot use the update logic in ImportZkOffsets, since we want to store offsets in the broker in 0.8.2. It looks like SimpleConsumer.commitOffsets() would work with either version. Is there a better way? -Suren On Wednesday, February 18, 2015 11:49 AM, Michal Michalski michal.michal...@boxever.com wrote: See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools and check the following: GetOffsetShell (not very accurate - will set your offsets to much smaller values than you really need; we log offsets frequently in application logs and get it from there) ImportZkOffsets Kind regards, Michał Michalski, michal.michal...@boxever.com On 18 February 2015 at 16:16, Surendranauth Hiraman suren.hira...@velos.io wrote: We are using the High Level Consumer API to interact with Kafka. However, on restart in the case of failures, we want to be able to manually reset offsets in certain situations. What is the recommended way to do this? Should we use the Simple Consumer API just for this restart case? Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 54 West 40th Street, 3RD FLOOR NEW YORK, NY 10018 T: @suren_h E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Consuming a snapshot from log compacted topic
Do you have to separate the snapshot from the normal update flow. I've used a compacting kafka topic as the source of truth to a solr database and fed the topic both with real time updates and snapshots from a hive job. This worked very well. The nice point is that there is a seamless transition between loading an initial state and after that continuing with incremental updates. Thus no need to actually tell whether you are at the end of the stream or not. (Just our normal monitor the lag problem...) 2015-02-18 19:18 GMT+01:00 Will Funnell w.f.funn...@gmail.com: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the cleaner-offset-checkpoint you would be able to consume up to this point, check the point was still the same, and compaction had not yet occurred, and therefore determine you had receive everything at least once. (Assuming there was no race condition between compaction and writing to the file) Has anybody got any thoughts? Will
Re: Hold off on 0.8.2 upgrades
We have fixed the issue in KAFKA-1952. We will wait for a few more days to see if any new issue comes up. After that, we will do an 0.8.2.1 release. Thanks, Jun On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, We found an issue in 0.8.2 that can lead to high CPU usage on brokers with lots of partitions. We are working on a fix for this. You can track progress here: https://issues.apache.org/jira/browse/KAFKA-1952 I would recommend holding off on upgrading to 0.8.2 until we have a fix for this issue. Sorry for the inconvenience. -Jay
Re: Hold off on 0.8.2 upgrades
Does it make sense to wait, I don't think people will upgrade without the patched version and I think we should release it to unblock people. -Jay On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote: We have fixed the issue in KAFKA-1952. We will wait for a few more days to see if any new issue comes up. After that, we will do an 0.8.2.1 release. Thanks, Jun On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, We found an issue in 0.8.2 that can lead to high CPU usage on brokers with lots of partitions. We are working on a fix for this. You can track progress here: https://issues.apache.org/jira/browse/KAFKA-1952 I would recommend holding off on upgrading to 0.8.2 until we have a fix for this issue. Sorry for the inconvenience. -Jay
Re: Consuming a snapshot from log compacted topic
If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the cleaner-offset-checkpoint you would be able to consume up to this point, check the point was still the same, and compaction had not yet occurred, and therefore determine you had receive everything at least once. (Assuming there was no race condition between compaction and writing to the file) Has anybody got any thoughts? Will
Re: Thread safety of Encoder implementations
Hi Guozhang, Sorry for the delayed response. The code we use for the producer send call looks like this: We instantiate the producer like this: ProducerEventType, EventType producer = new ProducerEventType, EventType(config, context.getEventSerializer(), new ProducerHandlerWrapper(config, context.getCallback()), null, context.getPartitioner()); The ProducerHandlerWrapper just wraps the DefaultEventHandler with a callback in case of exceptions. The context.getEventSerializer returns an object which is an implementation of the Encoder class. The kafka config object we pass has these parameters: p.setProperty(serializer.class, eventSerializer.getClass().getName()); p.setProperty(broker.list, kafkaBrokerList.toString()); // Kafka default is 10K p.setProperty(queue.size, 5); p.setProperty(queue.enqueue.timeout.ms, -1); p.setProperty(max.message.size, Property.KafkaProducerMaxMessageSizeBytes.value()); p.setProperty(buffer.size, Property.KafkaProducerSocketBufferSizeBytes.value()); p.setProperty(connect.timeout.ms, Property.KafkaProducerConnectTimeoutMillis.value()); p.setProperty(socket.timeout.ms, String.valueOf(TimeValue.parseMillisStrict(Property.KafkaProducerSocketTimeout.value(; So we actually specify the serializer.class property in the KafkaConfig object as well as pass an Encoder in the constructor of the Producer. When we are actually producing an event, we do it like this: producer.send(convertToScalaProducerDataList(topic, event)); this is the convertToScalaProducerDataList method: // Java to Scala conversions private scala.collection.Seqkafka.producer.ProducerDataEventType, EventType convertToScalaProducerDataList(String topic, EventType e) { ArrayListProducerDataEventType, EventType list = new ArrayList(1); list.add(new ProducerDataEventType, EventType(topic, e)); return JavaConversions.asScalaBuffer(list); } We are planning on upgrading to Kafka 0.8 sometime this year, so I'm to glad to know that that will solve this issue. In the meantime, let me know if you think the calling pattern we are using will cause some concurrent access of the Encoder class. Thank you! Cheers, Liz B. On Thu, Jan 15, 2015 at 3:42 PM, Guozhang Wang wangg...@gmail.com wrote: Liz, Could you paste your code for calling the producer send call here? Just realized in 0.7 there might be some calling pattern corner cases that cause concurrent access of the serializer. Also, I would recommend you to try out the new version of Kafka (0.8.x), in which each producer will only have one back ground thread for sending data, guaranteeing thread safety. Guozhang On Tue, Jan 13, 2015 at 11:40 AM, Elizabeth Bennett ebenn...@loggly.com wrote: Hi Guozhang, Thanks for you response. We've only got one producer client (per Kryo instance) but the producer client is configured (via the broker.list config) to produce to two Kafka brokers. When we create the Producer, we pass in an instance of the serializer. What if we used the serializer.class config to specify the class name of the serializer rather than pass in an instance? Would Kafka then create a separate serializer instance for each broker that it produces to? That would solve our problem assuming that the Producer spawns new threads for each kafka broker that it produces to, which I'm not sure about. --Liz On Mon, Jan 12, 2015 at 10:55 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Liz, Do you have multiple producer clients that use the same Kryo serializer objects? Each client will only have one background thread that tries to call serialize(), and hence in that case you will have concurrent access. Guozhang On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett ebenn...@loggly.com wrote: Hi Kafka Users, I have written my own implementation of the kafka Encoder class for serializing objects to Messages. It uses Kryo, which is a non-thread safe java serialization library. I'm using Kafka 0.7.2. We recently ran into an issue where we increased the number of kafka brokers for our kafka producer from 1 to 2. When we did this, we ran into exceptions that seemed related to Kryo being used concurrently by multiple threads. So, my question is, do I need to modify my Encoder class to be thread safe? I dug through the Kafka documentation and couldn't find anything that said one way or another. Any information would be great. Thank you! --Liz Bennett p.s. for what it's worth here is a stack trace from one of the exceptions we saw: 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed to write 9 batched events to Kafka. com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException: 40 Serialization trace: fieldGroups (com.loggly.core.event.Event) event (com.loggly.core.event.FailedEvent)
Resetting Offsets
We are using the High Level Consumer API to interact with Kafka. However, on restart in the case of failures, we want to be able to manually reset offsets in certain situations. What is the recommended way to do this? Should we use the Simple Consumer API just for this restart case? Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 54 West 40th Street, 3RD FLOOR NEW YORK, NY 10018 T: @suren_h E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Flafska, a python Flask HTTP REST client to kafka
Hi list, There were already 2 HTTP REST clients to Kafka (namely kafka-http [1] and dropwizard-kakfa-http [2]). I implemented another one in python with Flask (named flasfka): https://github.com/travel-intelligence/flasfka I tend to trust projects better when they come with tests and continuous integration, so it is hooked to travis-ci. It builds on top of kafka-python [3]. Cheers, Christophe-Marie Duquesne [1]: https://github.com/mailgun/kafka-http [2]: https://github.com/stealthly/dropwizard-kafka-http [3]: https://github.com/mumrah/kafka-python
Re: Broker Shutdown / Leader handoff issue
After further investigation, I've figured out that the issue is caused by the follower not processing messages from the controller until its ReplicaFetcherThread has shutdown completely (which only happens when the socket times out). If the test waits for the socket to timeout, the logs show that the ReplicaFetcherThread shuts down completely, and immediately thereafter, the UpdateMetadata requests get processed. Strangely, this happens even when controlled shutdown is enabled. Sounds related to this[1] which seems to have been fixed in 0.8.0. Are there other edge cases not covered by the fix? Is this a known problem in 0.8.1.1? Thanks, Philippe [1] https://issues.apache.org/jira/browse/KAFKA-612 On Wed, Feb 18, 2015 at 12:21 AM, Philippe Laflamme plafla...@hopper.com wrote: Hi, I'm trying to replicate a broker shutdown in unit tests. I've got a simple cluster running with 2 brokers (and one ZK). I'm successfully able to create a topic with a single partition and replication factor of 2. I'd like to test shutting down the current leader for the partition and make sure my code handles the exceptions thrown such as NotLeaderForPartitionException. I can't seem to shutdown a broker and have the remaining one report that it is now the leader for the partition. It looks as though the controller successfully changes leadership, but the broker itself is unaware of the change. Here's a gist of the (convoluted) logs[1]. The sequence is as follows: 1- start 1 ZK and 2 brokers 2- create a topic (test-bogus) with 1 partition and 2 replication factor 3- wait for leadership 4- ask the controller who is the leader 5- ask all brokers who is the leader 6- shutdown leader 7- wait for leadership 8- ask the controller who is the leader 9- ask the remaining broker who is the leader Steps 4-6 appear here in the logs[2] Steps 8-9 appear here[3] As you can see, the controller is aware of the leadership change, but not the broker. I've activated controlled shutdown and this is still happening. Any idea what may be causing this? I'm using Kafka 0.8.1.1 and ZK 3.4.5-cdh4.6 I'm using a TopicMetadataRequest for asking the brokers and inspecting ControllerContext.partitionLeadershipInfo to fetch leadership from the Controller. Thanks Philippe [1] https://gist.github.com/plaflamme/60805bfe15ae0106304a [2] https://gist.github.com/plaflamme/60805bfe15ae0106304a#file-gistfile1-txt-L153-L158 [3] https://gist.github.com/plaflamme/60805bfe15ae0106304a#file-gistfile1-txt-L227-L228
Re: Resetting Offsets
See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools and check the following: GetOffsetShell (not very accurate - will set your offsets to much smaller values than you really need; we log offsets frequently in application logs and get it from there) ImportZkOffsets Kind regards, Michał Michalski, michal.michal...@boxever.com On 18 February 2015 at 16:16, Surendranauth Hiraman suren.hira...@velos.io wrote: We are using the High Level Consumer API to interact with Kafka. However, on restart in the case of failures, we want to be able to manually reset offsets in certain situations. What is the recommended way to do this? Should we use the Simple Consumer API just for this restart case? Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 54 West 40th Street, 3RD FLOOR NEW YORK, NY 10018 T: @suren_h E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Consuming a snapshot from log compacted topic
You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the cleaner-offset-checkpoint you would be able to consume up to this point, check the point was still the same, and compaction had not yet occurred, and therefore determine you had receive everything at least once. (Assuming there was no race condition between compaction and writing to the file) Has anybody got any thoughts? Will
Broker w/ high memory due to index file sizes
I am running a cluster of 5 brokers with 40G ms/mx for each. I found one of the brokers is constantly using above ~90% of memory for jvm.heapUsage. I checked from lsof output that the size of the index files for this broker is too large. Not sure what is going on with this one broker in the cluster? Why would the index file sizes be so hugely different on one broker? Any ideas? Regards Zakee Invest with the Trend Exclusive Breakout Alert On Soaring Social Media Technology http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc
Re: Hold off on 0.8.2 upgrades
Well, I guess what I was thinking is that since we have the long timeout on the vote anyway, no reason not to call the vote now, should anything else pop up we can cancel the vote. -Jay On Wed, Feb 18, 2015 at 4:04 PM, Jun Rao j...@confluent.io wrote: Well, KAFKA-1952 only introduces high CPU overhead if the number of partitions in a fetch request is high, say more than a couple of hundreds. So, it may not show up in every installation. For example, if you have 1000 leader replicas in a broker, but have a 20 node cluster, each replica fetch request is only going to include about 50 partitions. Since there is a bit of overhead running a release, I was hoping to collect some more feedback from people trying the 0.8.2.0 release who may not be affected by this issue. But I agree that we don't need to wait for too long. Thanks, Jun On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Does it make sense to wait, I don't think people will upgrade without the patched version and I think we should release it to unblock people. -Jay On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote: We have fixed the issue in KAFKA-1952. We will wait for a few more days to see if any new issue comes up. After that, we will do an 0.8.2.1 release. Thanks, Jun On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, We found an issue in 0.8.2 that can lead to high CPU usage on brokers with lots of partitions. We are working on a fix for this. You can track progress here: https://issues.apache.org/jira/browse/KAFKA-1952 I would recommend holding off on upgrading to 0.8.2 until we have a fix for this issue. Sorry for the inconvenience. -Jay
Kakfa question about starting kafka with same broker id
Hi, My name is Deepak and I work for salesforce. We are using kafka 8.11 and have a question about starting kafka with same broker id. Steps: Start a kakfa broker with broker id =1 - it starts fine with external ZK Start another kafka with same broker id =1 .. it does not start the kafka which is expected but I am seeing the following log and it keeps retrying forever. Is there way to control how many time a broker tries to starts itself with the same broker id ? Thanks Deepak [2015-02-18 14:47:20,713] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,719] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
Re: [VOTE] 0.8.2.1 Candidate 1
+1 On Feb 18, 2015 7:23 PM, Matt Narrell matt.narr...@gmail.com wrote: +1 On Feb 18, 2015, at 7:56 PM, Jun Rao j...@confluent.io wrote: This is the first candidate for release of Apache Kafka 0.8.2.1. This only fixes one critical issue (KAFKA-1952) in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Saturday, Feb 21, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6 /*** Thanks, Jun
Re: Consuming a snapshot from log compacted topic
Do you have to separate the snapshot from the normal update flow. We are trying to avoid using another datasource if possible to have one source of truth. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. Yes, this is correct. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. That sounds like it could work, I think if you could schedule the time/frequency which compaction occured, clients could be scheduled to run in between. 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the
[VOTE] 0.8.2.1 Candidate 1
This is the first candidate for release of Apache Kafka 0.8.2.1. This only fixes one critical issue (KAFKA-1952) in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Saturday, Feb 21, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6 /*** Thanks, Jun
Re: Broker w/ high memory due to index file sizes
40G is really huge, generally you would want more like 4G. Are you sure you need that? Not sure what you mean by lsof and index files being too large, but the index files are memory mapped so they should be able to grow arbitrarily large and their memory usage is not counted in the java heap (in fact by having such a large heap you are taking away OS memory from them). -Jay On Wed, Feb 18, 2015 at 4:13 PM, Zakee kzak...@netzero.net wrote: I am running a cluster of 5 brokers with 40G ms/mx for each. I found one of the brokers is constantly using above ~90% of memory for jvm.heapUsage. I checked from lsof output that the size of the index files for this broker is too large. Not sure what is going on with this one broker in the cluster? Why would the index file sizes be so hugely different on one broker? Any ideas? Regards Zakee Invest with the Trend Exclusive Breakout Alert On Soaring Social Media Technology http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc
Re: Hold off on 0.8.2 upgrades
Well, KAFKA-1952 only introduces high CPU overhead if the number of partitions in a fetch request is high, say more than a couple of hundreds. So, it may not show up in every installation. For example, if you have 1000 leader replicas in a broker, but have a 20 node cluster, each replica fetch request is only going to include about 50 partitions. Since there is a bit of overhead running a release, I was hoping to collect some more feedback from people trying the 0.8.2.0 release who may not be affected by this issue. But I agree that we don't need to wait for too long. Thanks, Jun On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Does it make sense to wait, I don't think people will upgrade without the patched version and I think we should release it to unblock people. -Jay On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote: We have fixed the issue in KAFKA-1952. We will wait for a few more days to see if any new issue comes up. After that, we will do an 0.8.2.1 release. Thanks, Jun On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, We found an issue in 0.8.2 that can lead to high CPU usage on brokers with lots of partitions. We are working on a fix for this. You can track progress here: https://issues.apache.org/jira/browse/KAFKA-1952 I would recommend holding off on upgrading to 0.8.2 until we have a fix for this issue. Sorry for the inconvenience. -Jay
Re: Kakfa question about starting kafka with same broker id
Deepak, You should getting following error and kafka server will shutdown itself it it sees same brokerid registered in zookeeper. Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. -Harsha On Wed, Feb 18, 2015, at 03:16 PM, Deepak Dhakal wrote: Hi, My name is Deepak and I work for salesforce. We are using kafka 8.11 and have a question about starting kafka with same broker id. Steps: Start a kakfa broker with broker id =1 - it starts fine with external ZK Start another kafka with same broker id =1 .. it does not start the kafka which is expected but I am seeing the following log and it keeps retrying forever. Is there way to control how many time a broker tries to starts itself with the same broker id ? Thanks Deepak [2015-02-18 14:47:20,713] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,719] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
Re: Consuming a snapshot from log compacted topic
2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each data directory which is written to after log compaction completes. If the consumer was able to access the cleaner-offset-checkpoint you would be able to consume up to this point, check the point was still the same, and compaction had not yet occurred, and therefore determine you had receive everything at least once. (Assuming there was no race condition between compaction and writing to the file) Has anybody got any thoughts?
Re: Kakfa question about starting kafka with same broker id
Why would you want to ever do that? On Feb 18, 2015, at 15:16, Deepak Dhakal ddha...@salesforce.com wrote: Hi, My name is Deepak and I work for salesforce. We are using kafka 8.11 and have a question about starting kafka with same broker id. Steps: Start a kakfa broker with broker id =1 - it starts fine with external ZK Start another kafka with same broker id =1 .. it does not start the kafka which is expected but I am seeing the following log and it keeps retrying forever. Is there way to control how many time a broker tries to starts itself with the same broker id ? Thanks Deepak [2015-02-18 14:47:20,713] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,719] INFO conflict in /controller data: {version:1,brokerid:19471,timestamp:1424299100135} stored data: {version:1,brokerid:19471,timestamp:1424288444314} (kafka.utils.ZkUtils$) [2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
Re: [VOTE] 0.8.2.1 Candidate 1
+1 On Feb 18, 2015, at 7:56 PM, Jun Rao j...@confluent.io wrote: This is the first candidate for release of Apache Kafka 0.8.2.1. This only fixes one critical issue (KAFKA-1952) in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Saturday, Feb 21, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6 /*** Thanks, Jun
Re: Hold off on 0.8.2 upgrades
Yes, that makes sense. I will try to roll out 0.8.2.1 for vote later today. Thanks, Jun On Wed, Feb 18, 2015 at 4:15 PM, Jay Kreps jay.kr...@gmail.com wrote: Well, I guess what I was thinking is that since we have the long timeout on the vote anyway, no reason not to call the vote now, should anything else pop up we can cancel the vote. -Jay On Wed, Feb 18, 2015 at 4:04 PM, Jun Rao j...@confluent.io wrote: Well, KAFKA-1952 only introduces high CPU overhead if the number of partitions in a fetch request is high, say more than a couple of hundreds. So, it may not show up in every installation. For example, if you have 1000 leader replicas in a broker, but have a 20 node cluster, each replica fetch request is only going to include about 50 partitions. Since there is a bit of overhead running a release, I was hoping to collect some more feedback from people trying the 0.8.2.0 release who may not be affected by this issue. But I agree that we don't need to wait for too long. Thanks, Jun On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Does it make sense to wait, I don't think people will upgrade without the patched version and I think we should release it to unblock people. -Jay On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote: We have fixed the issue in KAFKA-1952. We will wait for a few more days to see if any new issue comes up. After that, we will do an 0.8.2.1 release. Thanks, Jun On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, We found an issue in 0.8.2 that can lead to high CPU usage on brokers with lots of partitions. We are working on a fix for this. You can track progress here: https://issues.apache.org/jira/browse/KAFKA-1952 I would recommend holding off on upgrading to 0.8.2 until we have a fix for this issue. Sorry for the inconvenience. -Jay
Re: Consuming a snapshot from log compacted topic
Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The standard Kafka way of dealing with this seems to be by using a ConsumerTimeoutException, but we are frequently getting this error when the end of the topic has not been reached or even it may take a long time before a timeout naturally occurs. On first glance it would seem possible to do a lookup for the max offset for each partition when you begin consuming, stopping when this position it reached. But log compaction means that if an update to a piece of content arrives with the same message key, then this will be written to the end so the snapshot will be incomplete. Another thought is to make use of the cleaner point. Currently Kafka writes out to a cleaner-offset-checkpoint file in each