Re: Having trouble with the simplest remote kafka config

2015-02-18 Thread Richard Spillane
Sure no problem. I actually did run the ‘localhost’ command to generate output 
for the e-mail, but that was a typo: I always use the right IP. But I ran it 
again, this time with the IP of the VM. I ran the command from my Mac (the 
client). Thanks for taking a look! Here it is:

Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ bin/kafka-console-producer.sh 
--broker-list 192.168.241.128:9092 --topic test
[2015-02-18 09:45:49,148] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
and boy are my arms tired
[2015-02-18 09:45:58,533] WARN Failed to send producer request with correlation 
id 2 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-18 09:45:58,649] WARN Failed to send producer request with correlation 
id 5 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at 

Consuming a snapshot from log compacted topic

2015-02-18 Thread Will Funnell
We are currently using Kafka 0.8.1.1 with log compaction in order to
provide streams of messages to our clients.

As well as constantly consuming the stream, one of our use cases is to
provide a snapshot, meaning the user will receive a copy of every message
at least once.

Each one of these messages represents an item of content in our system.


The problem comes when determining if the client has actually reached the
end of the topic.

The standard Kafka way of dealing with this seems to be by using a
ConsumerTimeoutException, but we are frequently getting this error when the
end of the topic has not been reached or even it may take a long time
before a timeout naturally occurs.


On first glance it would seem possible to do a lookup for the max offset
for each partition when you begin consuming, stopping when this position it
reached.

But log compaction means that if an update to a piece of content arrives
with the same message key, then this will be written to the end so the
snapshot will be incomplete.


Another thought is to make use of the cleaner point. Currently Kafka writes
out to a cleaner-offset-checkpoint file in each data directory which is
written to after log compaction completes.

If the consumer was able to access the cleaner-offset-checkpoint you would
be able to consume up to this point, check the point was still the same,
and compaction had not yet occurred, and therefore determine you had
receive everything at least once. (Assuming there was no race condition
between compaction and writing to the file)


Has anybody got any thoughts?

Will


Re: Having trouble with the simplest remote kafka config

2015-02-18 Thread Richard Spillane
Yes, the topic I am producing to exists. I can produce to it fine when running 
the kafa-console-producer.sh tool from the Kafka node (the VM) itself (in which 
case I can either use localhost or the public-facing IP):

Here is where I produce messages (running on the Kafka node):
==
rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list 
192.168.241.128:9092 --topic test
[2015-02-18 09:29:19,373] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
echo
[2015-02-18 09:29:21,786] INFO Closing socket connection to /192.168.241.128. 
(kafka.network.Processor)
bobs your uncle
other bobs your uncle
^Z
[3]+  Stopped bin/kafka-console-producer.sh --broker-list 
192.168.241.128:9092 --topic test

Here is where I consume them (also on the Kafka node):
==
rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-consumer.sh --zookeeper 
192.168.241.128:2181 --topic test --from-beginning
[2015-02-18 09:30:52,566] INFO Accepted socket connection from 
/192.168.241.128:40532 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2015-02-18 09:30:52,577] INFO Client attempting to establish new session at 
/192.168.241.128:40532 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-02-18 09:30:52,579] INFO Established session 0x14b9afea96f0003 with 
negotiated timeout 3 for client /192.168.241.128:40532 
(org.apache.zookeeper.server.ZooKeeperServer)
[2015-02-18 09:30:53,050] INFO Accepted socket connection from 
/192.168.241.128:40533 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2015-02-18 09:30:53,060] INFO Client attempting to establish new session at 
/192.168.241.128:40533 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-02-18 09:30:53,062] INFO Established session 0x14b9afea96f0004 with 
negotiated timeout 6000 for client /192.168.241.128:40533 
(org.apache.zookeeper.server.ZooKeeperServer)
[2015-02-18 09:30:53,237] INFO Got user-level KeeperException when processing 
sessionid:0x14b9afea96f0004 type:create cxid:0x2 zxid:0x42 txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-23321/ids 
Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/ids 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2015-02-18 09:30:53,246] INFO Got user-level KeeperException when processing 
sessionid:0x14b9afea96f0004 type:create cxid:0x3 zxid:0x43 txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-23321 Error:KeeperErrorCode 
= NoNode for /consumers/console-consumer-23321 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2015-02-18 09:30:53,903] INFO Got user-level KeeperException when processing 
sessionid:0x14b9afea96f0004 type:create cxid:0x19 zxid:0x47 txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-23321/owners/test 
Error:KeeperErrorCode = NoNode for 
/consumers/console-consumer-23321/owners/test 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2015-02-18 09:30:53,905] INFO Got user-level KeeperException when processing 
sessionid:0x14b9afea96f0004 type:create cxid:0x1a zxid:0x48 txntype:-1 
reqpath:n/a Error Path:/consumers/console-consumer-23321/owners 
Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/owners 
(org.apache.zookeeper.server.PrepRequestProcessor)
[2015-02-18 09:30:54,101] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
echo
bobs your uncle
other bobs your uncle

The ‘test’ topic does exist:
==
rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-topics.sh --list --zookeeper 
localhost:2181
[2015-02-18 09:32:56,991] INFO Accepted socket connection from /127.0.0.1:42143 
(org.apache.zookeeper.server.NIOServerCnxnFactory)
[2015-02-18 09:32:56,996] INFO Client attempting to establish new session at 
/127.0.0.1:42143 (org.apache.zookeeper.server.ZooKeeperServer)
[2015-02-18 09:32:56,997] INFO Established session 0x14b9afea96f0006 with 
negotiated timeout 3 for client /127.0.0.1:42143 
(org.apache.zookeeper.server.ZooKeeperServer)
test
[2015-02-18 09:32:57,053] INFO Processed session termination for sessionid: 
0x14b9afea96f0006 (org.apache.zookeeper.server.PrepRequestProcessor)
[2015-02-18 09:32:57,054] INFO Closed socket connection for client 
/127.0.0.1:42143 which had sessionid 0x14b9afea96f0006 
(org.apache.zookeeper.server.NIOServerCnxn)

The version of Kafka I am using is kafka_2.10-0.8.2.0.

I am hoping that this is such a simple configuration that debugging would be 
overkill, but rather there is some simple setting used by the Quick Start 
example that I missed that precludes a producer posting to a remote queue :)

 On Feb 17, 2015, at 10:19 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
 Time to debug Kafka then :)
 
 Does the topic you are producing to exists? (you can check with
 kafka-topics tool)
 If not, do you have auto-creation enabled?
 
 Which version are you on? Is it possible you ran into  KAFKA-1738?
 
 
 On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane 

Re: Having trouble with the simplest remote kafka config

2015-02-18 Thread Jiangjie Qin
I think your log did show that your are connecting to localhost:9092:

[2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id
0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092]
failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException


Can you check again?

-Jiangjie (Becket) Qin

On 2/17/15, 10:19 PM, Gwen Shapira gshap...@cloudera.com wrote:

Time to debug Kafka then :)

Does the topic you are producing to exists? (you can check with
kafka-topics tool)
If not, do you have auto-creation enabled?

Which version are you on? Is it possible you ran into  KAFKA-1738?


On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane r...@defend7.com
wrote:

 Telnet seems to be able to connect from the Mac to the VM and from the
VM
 to the VM:

 From Mac to VM:
 Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128
9092
 Trying 192.168.241.128...
 Connected to 192.168.241.128.
 Escape character is '^]¹.

 From VM to VM:
 rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092
 Trying ::1...
 Connected to localhost.
 Escape character is '^]¹.

 From VM to Mac:
 rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092
 Trying 192.168.1.27...
 telnet: Unable to connect to remote host: Connection refused

 From Mac to Mac:
 Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092
 Trying ::1...
 telnet: connect to address ::1: Connection refused
 Trying 127.0.0.1...
 telnet: connect to address 127.0.0.1: Connection refused
 telnet: Unable to connect to remote host


  On Feb 17, 2015, at 10:03 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  What happens when you telnet to port 9092? try it from both your mac
and
  the ubuntu vm.
 
 
  On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane r...@defend7.com
 wrote:
 
  I checked iptables and all rules are set to forward, so nothing
should
 be
  blocked in the VM example. In the container example the port is
 explicitly
  EXPOSEd and other ports in a similar range (e.g., 8080) can be
accessed
  just fine.
 
  On Feb 17, 2015, at 8:56 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Is it possible that you have iptables on the Ubuntu where you run
your
  broker?
 
  Try disabling iptables and see if it fixes the issue.
 
  On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane r...@defend7.com
  wrote:
 
  So I would like to have two machines: one running zookeeper and a
 single
  kafka node and another machine running a producer. I want to use
the
  basic
  commands mentioned in the Quick Start guide to do this. However, I
 keep
  getting connection closed exceptions in the producer.
 
  This is what I do:
  On the kafka/zookeeper machine:
  bin/zookeeper-server-start.sh config/zookeeper.properties 
  bin/kafka-server-start.sh config/server.properties 
  bin/kafka-topics.sh --create --zookeeper localhost:2181
  --replication-factor 1 --partitions 1 --topic test
 
  Šso far so good, now on the producer machine:
  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
 test
  [2015-02-17 20:43:28,466] WARN Property topic is not valid
  (kafka.utils.VerifiableProperties)
  echo
  press enter to send Œecho¹ above
 
  Šnow it starts spewing the errors in the Producer Errors Appendix.
 
  What I don¹t understand is why? I checked the default
configurations
 and
  it is binding to all interfaces as the bind to localhost is
commented
  out.
  I checked netstat and 9092 is open on the zookeeper/kafka machine.
I
  have
  tried this with an Ubuntu VM and a container where the container
hosts
  the
  zookeeper/kafka server and I have tried this with my native machine
 (OS
  X)
  and an Ubuntu VM where the VM is the zookeeper/kafka server. In
both
  cases
  the same thing happens.
 
  I am just trying to get the simplest possible configuration where
the
  producer is not on the same machine as the kafka queue up and
running.
  How
  can I make this work? Thanks for any help.
 
  Producer Erros Appendix:
 
  [2015-02-17 20:43:32,622] WARN Fetching topic metadata with
 correlation
  id
  0 for topics [Set(test)] from broker
[id:0,host:localhost,port:9092]
  failed
  (kafka.client.ClientUtils$)
  java.nio.channels.ClosedChannelException
at 
kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at
  kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
 
 
 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
cer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at
 
 
 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
2)
at
 
 
 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D
efaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at 

Re: Having trouble with the simplest remote kafka config

2015-02-18 Thread Richard Spillane
I also tried running the producer from the Mac client again, but this time with 
TRACE and DEBUG options un-commented from the log4j.properties file on the VM 
server. It seems that the connection is established (on port 50045) and bytes 
are being read from the client (192.168.241.1). Then subsequent connections are 
made (on ports 50046, 50047, 50048, and 50049). I am guessing these are retry 
attempts made by the producer. So it looks like the connection is made, and 
then Kafka decides to close it for some reason, and the client continues to 
retry.

Here are the debug-on server-side logs:
=
[2015-02-18 09:59:53,819] TRACE Processor id 1 selection time = 300542531 ns 
(kafka.network.Processor)
[2015-02-18 09:59:53,952] TRACE Processor id 2 selection time = 301409787 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,019] TRACE Processor id 0 selection time = 30063 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,077] TRACE Processor id 0 selection time = 57586199 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,077] DEBUG Processor 0 listening to new connection from 
/192.168.241.1:50045 (kafka.network.Processor)
[2015-02-18 09:59:54,084] TRACE Processor id 0 selection time = 6156172 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,084] TRACE 36 bytes read from /192.168.241.1:50045 
(kafka.network.Processor)
[2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 1154956 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,085] TRACE Socket server received response to send, 
registering for write: 
Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@420433c6,null,1424282394084,/192.168.241.1:50045),kafka.network.BoundedByteBufferSend@21e32e06,SendAction)
 (kafka.network.Processor)
[2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 28607 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,086] TRACE 70 bytes written to /192.168.241.1:50045 using 
key sun.nio.ch.SelectionKeyImpl@420433c6 (kafka.network.Processor)
[2015-02-18 09:59:54,086] TRACE Finished writing, registering for read on 
connection /192.168.241.1:50045 (kafka.network.Processor)
[2015-02-18 09:59:54,097] TRACE Processor id 0 selection time = 11043038 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,098] INFO Closing socket connection to /192.168.241.1. 
(kafka.network.Processor)
[2015-02-18 09:59:54,098] DEBUG Closing connection from /192.168.241.1:50045 
(kafka.network.Processor)
[2015-02-18 09:59:54,121] TRACE Processor id 1 selection time = 301719474 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,253] TRACE Processor id 2 selection time = 300837240 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,259] TRACE Processor id 1 selection time = 137306479 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,259] DEBUG Processor 1 listening to new connection from 
/192.168.241.1:50046 (kafka.network.Processor)
[2015-02-18 09:59:54,260] TRACE Processor id 1 selection time = 42838 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,260] TRACE 36 bytes read from /192.168.241.1:50046 
(kafka.network.Processor)
[2015-02-18 09:59:54,262] TRACE Socket server received response to send, 
registering for write: 
Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@1c630e29,null,1424282394260,/192.168.241.1:50046),kafka.network.BoundedByteBufferSend@2b36b44e,SendAction)
 (kafka.network.Processor)
[2015-02-18 09:59:54,262] TRACE Processor id 1 selection time = 48788 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,263] TRACE 70 bytes written to /192.168.241.1:50046 using 
key sun.nio.ch.SelectionKeyImpl@1c630e29 (kafka.network.Processor)
[2015-02-18 09:59:54,263] TRACE Finished writing, registering for read on 
connection /192.168.241.1:50046 (kafka.network.Processor)
[2015-02-18 09:59:54,263] TRACE Processor id 1 selection time = 40185 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,264] INFO Closing socket connection to /192.168.241.1. 
(kafka.network.Processor)
[2015-02-18 09:59:54,264] DEBUG Closing connection from /192.168.241.1:50046 
(kafka.network.Processor)
[2015-02-18 09:59:54,369] TRACE Processor id 2 selection time = 115233690 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,369] DEBUG Processor 2 listening to new connection from 
/192.168.241.1:50047 (kafka.network.Processor)
[2015-02-18 09:59:54,370] TRACE Processor id 2 selection time = 43183 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,370] TRACE 36 bytes read from /192.168.241.1:50047 
(kafka.network.Processor)
[2015-02-18 09:59:54,372] TRACE Socket server received response to send, 
registering for write: 
Response(2,Request(2,sun.nio.ch.SelectionKeyImpl@26ec47e9,null,1424282394370,/192.168.241.1:50047),kafka.network.BoundedByteBufferSend@626525f5,SendAction)
 (kafka.network.Processor)
[2015-02-18 09:59:54,372] TRACE Processor id 2 selection time = 50442 ns 
(kafka.network.Processor)
[2015-02-18 09:59:54,372] TRACE 70 bytes written to /192.168.241.1:50047 using 
key sun.nio.ch.SelectionKeyImpl@26ec47e9 (kafka.network.Processor)

Re: Having trouble with the simplest remote kafka config

2015-02-18 Thread Richard Spillane
OK. Steve Miller helped me solve the problem. I needed to explicitly set 
advertised.host.name to advertised.host.name=192.168.241.128. The logs showed 
the producer could connect to 9092 but when it was told which hosts to connect 
to to queue messages it got unresolvable hosts. By setting this explicitly to 
192.168.241.128 I forced the hosts returned to be the resolvable IP address of 
my VM.

 On Feb 18, 2015, at 10:07 AM, Richard Spillane r...@defend7.com wrote:
 
 I also tried running the producer from the Mac client again, but this time 
 with TRACE and DEBUG options un-commented from the log4j.properties file on 
 the VM server. It seems that the connection is established (on port 50045) 
 and bytes are being read from the client (192.168.241.1). Then subsequent 
 connections are made (on ports 50046, 50047, 50048, and 50049). I am guessing 
 these are retry attempts made by the producer. So it looks like the 
 connection is made, and then Kafka decides to close it for some reason, and 
 the client continues to retry.
 
 Here are the debug-on server-side logs:
 =
 [2015-02-18 09:59:53,819] TRACE Processor id 1 selection time = 300542531 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:53,952] TRACE Processor id 2 selection time = 301409787 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,019] TRACE Processor id 0 selection time = 30063 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,077] TRACE Processor id 0 selection time = 57586199 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,077] DEBUG Processor 0 listening to new connection from 
 /192.168.241.1:50045 (kafka.network.Processor)
 [2015-02-18 09:59:54,084] TRACE Processor id 0 selection time = 6156172 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,084] TRACE 36 bytes read from /192.168.241.1:50045 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 1154956 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,085] TRACE Socket server received response to send, 
 registering for write: 
 Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@420433c6,null,1424282394084,/192.168.241.1:50045),kafka.network.BoundedByteBufferSend@21e32e06,SendAction)
  (kafka.network.Processor)
 [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 28607 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,086] TRACE 70 bytes written to /192.168.241.1:50045 
 using key sun.nio.ch.SelectionKeyImpl@420433c6 (kafka.network.Processor)
 [2015-02-18 09:59:54,086] TRACE Finished writing, registering for read on 
 connection /192.168.241.1:50045 (kafka.network.Processor)
 [2015-02-18 09:59:54,097] TRACE Processor id 0 selection time = 11043038 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,098] INFO Closing socket connection to /192.168.241.1. 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,098] DEBUG Closing connection from /192.168.241.1:50045 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,121] TRACE Processor id 1 selection time = 301719474 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,253] TRACE Processor id 2 selection time = 300837240 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,259] TRACE Processor id 1 selection time = 137306479 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,259] DEBUG Processor 1 listening to new connection from 
 /192.168.241.1:50046 (kafka.network.Processor)
 [2015-02-18 09:59:54,260] TRACE Processor id 1 selection time = 42838 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,260] TRACE 36 bytes read from /192.168.241.1:50046 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,262] TRACE Socket server received response to send, 
 registering for write: 
 Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@1c630e29,null,1424282394260,/192.168.241.1:50046),kafka.network.BoundedByteBufferSend@2b36b44e,SendAction)
  (kafka.network.Processor)
 [2015-02-18 09:59:54,262] TRACE Processor id 1 selection time = 48788 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,263] TRACE 70 bytes written to /192.168.241.1:50046 
 using key sun.nio.ch.SelectionKeyImpl@1c630e29 (kafka.network.Processor)
 [2015-02-18 09:59:54,263] TRACE Finished writing, registering for read on 
 connection /192.168.241.1:50046 (kafka.network.Processor)
 [2015-02-18 09:59:54,263] TRACE Processor id 1 selection time = 40185 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,264] INFO Closing socket connection to /192.168.241.1. 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,264] DEBUG Closing connection from /192.168.241.1:50046 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,369] TRACE Processor id 2 selection time = 115233690 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,369] DEBUG Processor 2 listening to new connection from 
 /192.168.241.1:50047 (kafka.network.Processor)
 [2015-02-18 09:59:54,370] TRACE Processor id 2 selection time = 43183 ns 
 (kafka.network.Processor)
 [2015-02-18 09:59:54,370] TRACE 36 bytes read from /192.168.241.1:50047 
 

Re: Resetting Offsets

2015-02-18 Thread Suren
Reading offsets looks like it's compatible across 0.8.1 and 0.8.2.

However, we cannot use the update logic in ImportZkOffsets, since we want to 
store offsets in the broker in 0.8.2.
It looks like SimpleConsumer.commitOffsets() would work with either version. Is 
there a better way?
-Suren
 

 On Wednesday, February 18, 2015 11:49 AM, Michal Michalski 
michal.michal...@boxever.com wrote:
   

 See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools
and check the following:
GetOffsetShell (not very accurate - will set your offsets to much smaller
values than you really need; we log offsets frequently in application logs
and get it from there)
ImportZkOffsets

Kind regards,
Michał Michalski,
michal.michal...@boxever.com

On 18 February 2015 at 16:16, Surendranauth Hiraman suren.hira...@velos.io
wrote:

 We are using the High Level Consumer API to interact with Kafka.

 However, on restart in the case of failures, we want to be able to manually
 reset offsets in certain situations.

 What is the recommended way to do this?

 Should we use the Simple Consumer API just for this restart case?

 Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 54 West 40th Street, 3RD FLOOR
 NEW YORK, NY 10018
 T: @suren_h
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io


   

Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread svante karlsson
Do you have to separate the snapshot from the normal update flow.

I've used a compacting kafka topic as the source of truth to a solr
database and fed the topic both with real time updates and snapshots from
a hive job. This worked very well. The nice point is that there is a
seamless transition between loading an initial state and after that
continuing with incremental updates. Thus no need to actually tell whether
you are at the end of the stream or not. (Just our normal monitor the lag
problem...)



2015-02-18 19:18 GMT+01:00 Will Funnell w.f.funn...@gmail.com:

 We are currently using Kafka 0.8.1.1 with log compaction in order to
 provide streams of messages to our clients.

 As well as constantly consuming the stream, one of our use cases is to
 provide a snapshot, meaning the user will receive a copy of every message
 at least once.

 Each one of these messages represents an item of content in our system.


 The problem comes when determining if the client has actually reached the
 end of the topic.

 The standard Kafka way of dealing with this seems to be by using a
 ConsumerTimeoutException, but we are frequently getting this error when the
 end of the topic has not been reached or even it may take a long time
 before a timeout naturally occurs.


 On first glance it would seem possible to do a lookup for the max offset
 for each partition when you begin consuming, stopping when this position it
 reached.

 But log compaction means that if an update to a piece of content arrives
 with the same message key, then this will be written to the end so the
 snapshot will be incomplete.


 Another thought is to make use of the cleaner point. Currently Kafka writes
 out to a cleaner-offset-checkpoint file in each data directory which is
 written to after log compaction completes.

 If the consumer was able to access the cleaner-offset-checkpoint you would
 be able to consume up to this point, check the point was still the same,
 and compaction had not yet occurred, and therefore determine you had
 receive everything at least once. (Assuming there was no race condition
 between compaction and writing to the file)


 Has anybody got any thoughts?

 Will



Re: Hold off on 0.8.2 upgrades

2015-02-18 Thread Jun Rao
We have fixed the issue in KAFKA-1952. We will wait for a few more days to
see if any new issue comes up. After that, we will do an 0.8.2.1 release.

Thanks,

Jun

On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey all,

 We found an issue in 0.8.2 that can lead to high CPU usage on brokers with
 lots of partitions. We are working on a fix for this. You can track
 progress here:
 https://issues.apache.org/jira/browse/KAFKA-1952

 I would recommend holding off on upgrading to 0.8.2 until we have a fix for
 this issue. Sorry for the inconvenience.

 -Jay



Re: Hold off on 0.8.2 upgrades

2015-02-18 Thread Jay Kreps
Does it make sense to wait, I don't think people will upgrade without the
patched version and I think we should release it to unblock people.

-Jay

On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote:

 We have fixed the issue in KAFKA-1952. We will wait for a few more days to
 see if any new issue comes up. After that, we will do an 0.8.2.1 release.

 Thanks,

 Jun

 On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey all,
 
  We found an issue in 0.8.2 that can lead to high CPU usage on brokers
 with
  lots of partitions. We are working on a fix for this. You can track
  progress here:
  https://issues.apache.org/jira/browse/KAFKA-1952
 
  I would recommend holding off on upgrading to 0.8.2 until we have a fix
 for
  this issue. Sorry for the inconvenience.
 
  -Jay
 



Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Jay Kreps
If you catch up off a compacted topic and keep consuming then you will
become consistent with the log.

I think what you are saying is that you want to create a snapshot from the
Kafka topic but NOT do continual reads after that point. For example you
might be creating a backup of the data to a file.

I agree that this isn't as easy as it could be. As you say the only
solution we have is that timeout which doesn't differentiate between GC
stall in your process and no more messages left so you would need to tune
the timeout. This is admittedly kind of a hack.

You are also correct and perceptive to notice that if you check the end of
the log then begin consuming and read up to that point compaction may have
already kicked in (if the reading takes a while) and hence you might have
an incomplete snapshot.

I think there are two features we could add that would make this easier:
1. Make the cleaner point configurable on a per-topic basis. This feature
would allow you to control how long the full log is retained and when
compaction can kick in. This would give a configurable SLA for the reader
process to catch up.
2. Make the log end offset available more easily in the consumer.

-Jay



On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com
wrote:

 We are currently using Kafka 0.8.1.1 with log compaction in order to
 provide streams of messages to our clients.

 As well as constantly consuming the stream, one of our use cases is to
 provide a snapshot, meaning the user will receive a copy of every message
 at least once.

 Each one of these messages represents an item of content in our system.


 The problem comes when determining if the client has actually reached the
 end of the topic.

 The standard Kafka way of dealing with this seems to be by using a
 ConsumerTimeoutException, but we are frequently getting this error when the
 end of the topic has not been reached or even it may take a long time
 before a timeout naturally occurs.


 On first glance it would seem possible to do a lookup for the max offset
 for each partition when you begin consuming, stopping when this position it
 reached.

 But log compaction means that if an update to a piece of content arrives
 with the same message key, then this will be written to the end so the
 snapshot will be incomplete.


 Another thought is to make use of the cleaner point. Currently Kafka writes
 out to a cleaner-offset-checkpoint file in each data directory which is
 written to after log compaction completes.

 If the consumer was able to access the cleaner-offset-checkpoint you would
 be able to consume up to this point, check the point was still the same,
 and compaction had not yet occurred, and therefore determine you had
 receive everything at least once. (Assuming there was no race condition
 between compaction and writing to the file)


 Has anybody got any thoughts?

 Will



Re: Thread safety of Encoder implementations

2015-02-18 Thread Elizabeth Bennett
Hi Guozhang,
Sorry for the delayed response. The code we use for the producer send call
looks like this:

We instantiate the producer like this:

  ProducerEventType, EventType producer = new ProducerEventType,
EventType(config, context.getEventSerializer(), new
ProducerHandlerWrapper(config, context.getCallback()), null,
context.getPartitioner());

The ProducerHandlerWrapper just wraps the DefaultEventHandler with a
callback in case of exceptions. The context.getEventSerializer returns an
object which is an implementation of the Encoder class.

The kafka config object we pass has these parameters:

p.setProperty(serializer.class, eventSerializer.getClass().getName());
p.setProperty(broker.list, kafkaBrokerList.toString());
// Kafka default is 10K
p.setProperty(queue.size, 5);
p.setProperty(queue.enqueue.timeout.ms, -1);
p.setProperty(max.message.size,
Property.KafkaProducerMaxMessageSizeBytes.value());
p.setProperty(buffer.size,
Property.KafkaProducerSocketBufferSizeBytes.value());
p.setProperty(connect.timeout.ms,
Property.KafkaProducerConnectTimeoutMillis.value());
p.setProperty(socket.timeout.ms,

String.valueOf(TimeValue.parseMillisStrict(Property.KafkaProducerSocketTimeout.value(;

So we actually specify the serializer.class property in the KafkaConfig
object as well as pass an Encoder in the constructor of the Producer.

When we are actually producing an event, we do it like this:

producer.send(convertToScalaProducerDataList(topic, event));

this is the convertToScalaProducerDataList method:

  // Java to Scala conversions
  private scala.collection.Seqkafka.producer.ProducerDataEventType,
EventType convertToScalaProducerDataList(String topic, EventType e) {
ArrayListProducerDataEventType, EventType list = new ArrayList(1);
list.add(new ProducerDataEventType, EventType(topic, e));
return JavaConversions.asScalaBuffer(list);
  }

We are planning on upgrading to Kafka 0.8 sometime this year, so I'm to
glad to know that that will solve this issue. In the meantime, let me know
if you think the calling pattern we are using will cause some concurrent
access of the Encoder class. Thank you!

Cheers,
Liz B.

On Thu, Jan 15, 2015 at 3:42 PM, Guozhang Wang wangg...@gmail.com wrote:

 Liz,

 Could you paste your code for calling the producer send call here? Just
 realized in 0.7 there might be some calling pattern corner cases that cause
 concurrent access of the serializer.

 Also, I would recommend you to try out the new version of Kafka (0.8.x), in
 which each producer will only have one back ground thread for sending data,
 guaranteeing thread safety.

 Guozhang

 On Tue, Jan 13, 2015 at 11:40 AM, Elizabeth Bennett ebenn...@loggly.com
 wrote:

  Hi Guozhang,
  Thanks for you response. We've only got one producer client (per Kryo
  instance) but the producer client is configured (via the broker.list
  config) to produce to two Kafka brokers. When we create the Producer, we
  pass in an instance of the serializer. What if we used the
 serializer.class
  config to specify the class name of the serializer rather than pass in an
  instance? Would Kafka then create a separate serializer instance for each
  broker that it produces to? That would solve our problem assuming that
 the
  Producer spawns new threads for each kafka broker that it produces to,
  which I'm not sure about.
 
  --Liz
 
  On Mon, Jan 12, 2015 at 10:55 PM, Guozhang Wang wangg...@gmail.com
  wrote:
 
   Hi Liz,
  
   Do you have multiple producer clients that use the same Kryo serializer
   objects? Each client will only have one background thread that tries to
   call serialize(), and hence in that case you will have concurrent
 access.
  
   Guozhang
  
  
   On Mon, Jan 12, 2015 at 5:32 PM, Elizabeth Bennett 
 ebenn...@loggly.com
   wrote:
  
Hi Kafka Users,
I have written my own implementation of the kafka Encoder class for
serializing objects to Messages. It uses Kryo, which is a non-thread
  safe
java serialization library. I'm using Kafka 0.7.2.
   
We recently ran into an issue where we increased the number of kafka
brokers for our kafka producer from 1 to 2. When we did this, we ran
  into
exceptions that seemed related to Kryo being used concurrently by
   multiple
threads. So, my question is, do I need to modify my Encoder class to
 be
thread safe? I dug through the Kafka documentation and couldn't find
anything that said one way or another. Any information would be
 great.
Thank you!
   
--Liz Bennett
   
p.s. for what it's worth here is a stack trace from one of the
  exceptions
we saw:
   
2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle]
 Failed
to write 9 batched events to Kafka.
com.esotericsoftware.kryo.KryoException:
java.lang.ArrayIndexOutOfBoundsException: 40
Serialization trace:
fieldGroups (com.loggly.core.event.Event)
event (com.loggly.core.event.FailedEvent)
   

Resetting Offsets

2015-02-18 Thread Surendranauth Hiraman
We are using the High Level Consumer API to interact with Kafka.

However, on restart in the case of failures, we want to be able to manually
reset offsets in certain situations.

What is the recommended way to do this?

Should we use the Simple Consumer API just for this restart case?

Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2.



SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

54 West 40th Street, 3RD FLOOR
NEW YORK, NY 10018
T: @suren_h
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Flafska, a python Flask HTTP REST client to kafka

2015-02-18 Thread Christophe-Marie Duquesne
Hi list,

There were already 2 HTTP REST clients to Kafka (namely kafka-http [1] and
dropwizard-kakfa-http [2]).

I implemented another one in python with Flask (named flasfka):

https://github.com/travel-intelligence/flasfka

I tend to trust projects better when they come with tests and continuous
integration, so it is hooked to travis-ci. It builds on top of kafka-python
[3].

Cheers,
Christophe-Marie Duquesne

[1]: https://github.com/mailgun/kafka-http
[2]: https://github.com/stealthly/dropwizard-kafka-http
[3]: https://github.com/mumrah/kafka-python


Re: Broker Shutdown / Leader handoff issue

2015-02-18 Thread Philippe Laflamme
After further investigation, I've figured out that the issue is caused by
the follower not processing messages from the controller until its
ReplicaFetcherThread has shutdown completely (which only happens when the
socket times out).

If the test waits for the socket to timeout, the logs show that the
ReplicaFetcherThread shuts down completely, and immediately thereafter, the
UpdateMetadata requests get processed.

Strangely, this happens even when controlled shutdown is enabled.

Sounds related to this[1] which seems to have been fixed in 0.8.0. Are
there other edge cases not covered by the fix? Is this a known problem in
0.8.1.1?

Thanks,
Philippe
[1] https://issues.apache.org/jira/browse/KAFKA-612

On Wed, Feb 18, 2015 at 12:21 AM, Philippe Laflamme plafla...@hopper.com
wrote:

 Hi,

 I'm trying to replicate a broker shutdown in unit tests. I've got a simple
 cluster running with 2 brokers (and one ZK). I'm successfully able to
 create a topic with a single partition and replication factor of 2.

 I'd like to test shutting down the current leader for the partition and
 make sure my code handles the exceptions thrown such as
 NotLeaderForPartitionException.

 I can't seem to shutdown a broker and have the remaining one report that
 it is now the leader for the partition. It looks as though the controller
 successfully changes leadership, but the broker itself is unaware of the
 change.

 Here's a gist of the (convoluted) logs[1].

 The sequence is as follows:
 1- start 1 ZK and 2 brokers
 2- create a topic (test-bogus) with 1 partition and 2 replication factor
 3- wait for leadership
 4- ask the controller who is the leader
 5- ask all brokers who is the leader
 6- shutdown leader
 7- wait for leadership
 8- ask the controller who is the leader
 9- ask the remaining broker who is the leader

 Steps 4-6 appear here in the logs[2]
 Steps 8-9 appear here[3]

 As you can see, the controller is aware of the leadership change, but not
 the broker. I've activated controlled shutdown and this is still happening.
 Any idea what may be causing this?

 I'm using Kafka 0.8.1.1 and ZK 3.4.5-cdh4.6

 I'm using a TopicMetadataRequest for asking the brokers and inspecting
 ControllerContext.partitionLeadershipInfo to fetch leadership from the
 Controller.

 Thanks
 Philippe
 [1] https://gist.github.com/plaflamme/60805bfe15ae0106304a
 [2]
 https://gist.github.com/plaflamme/60805bfe15ae0106304a#file-gistfile1-txt-L153-L158
 [3]
 https://gist.github.com/plaflamme/60805bfe15ae0106304a#file-gistfile1-txt-L227-L228



Re: Resetting Offsets

2015-02-18 Thread Michal Michalski
See https://cwiki.apache.org/confluence/display/KAFKA/System+Tools
and check the following:
GetOffsetShell (not very accurate - will set your offsets to much smaller
values than you really need; we log offsets frequently in application logs
and get it from there)
ImportZkOffsets

Kind regards,
Michał Michalski,
michal.michal...@boxever.com

On 18 February 2015 at 16:16, Surendranauth Hiraman suren.hira...@velos.io
wrote:

 We are using the High Level Consumer API to interact with Kafka.

 However, on restart in the case of failures, we want to be able to manually
 reset offsets in certain situations.

 What is the recommended way to do this?

 Should we use the Simple Consumer API just for this restart case?

 Ideally, it would be great to use the same approach in 0.8.1 and in 0.8.2.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 54 West 40th Street, 3RD FLOOR
 NEW YORK, NY 10018
 T: @suren_h
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io



Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Joel Koshy
 You are also correct and perceptive to notice that if you check the end of
 the log then begin consuming and read up to that point compaction may have
 already kicked in (if the reading takes a while) and hence you might have
 an incomplete snapshot.

Isn't it sufficient to just repeat the check at the end after reading
the log and repeat until you are truly done? At least for the purposes
of a snapshot?

On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
 If you catch up off a compacted topic and keep consuming then you will
 become consistent with the log.
 
 I think what you are saying is that you want to create a snapshot from the
 Kafka topic but NOT do continual reads after that point. For example you
 might be creating a backup of the data to a file.
 
 I agree that this isn't as easy as it could be. As you say the only
 solution we have is that timeout which doesn't differentiate between GC
 stall in your process and no more messages left so you would need to tune
 the timeout. This is admittedly kind of a hack.
 
 You are also correct and perceptive to notice that if you check the end of
 the log then begin consuming and read up to that point compaction may have
 already kicked in (if the reading takes a while) and hence you might have
 an incomplete snapshot.
 
 I think there are two features we could add that would make this easier:
 1. Make the cleaner point configurable on a per-topic basis. This feature
 would allow you to control how long the full log is retained and when
 compaction can kick in. This would give a configurable SLA for the reader
 process to catch up.
 2. Make the log end offset available more easily in the consumer.
 
 -Jay
 
 
 
 On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com
 wrote:
 
  We are currently using Kafka 0.8.1.1 with log compaction in order to
  provide streams of messages to our clients.
 
  As well as constantly consuming the stream, one of our use cases is to
  provide a snapshot, meaning the user will receive a copy of every message
  at least once.
 
  Each one of these messages represents an item of content in our system.
 
 
  The problem comes when determining if the client has actually reached the
  end of the topic.
 
  The standard Kafka way of dealing with this seems to be by using a
  ConsumerTimeoutException, but we are frequently getting this error when the
  end of the topic has not been reached or even it may take a long time
  before a timeout naturally occurs.
 
 
  On first glance it would seem possible to do a lookup for the max offset
  for each partition when you begin consuming, stopping when this position it
  reached.
 
  But log compaction means that if an update to a piece of content arrives
  with the same message key, then this will be written to the end so the
  snapshot will be incomplete.
 
 
  Another thought is to make use of the cleaner point. Currently Kafka writes
  out to a cleaner-offset-checkpoint file in each data directory which is
  written to after log compaction completes.
 
  If the consumer was able to access the cleaner-offset-checkpoint you would
  be able to consume up to this point, check the point was still the same,
  and compaction had not yet occurred, and therefore determine you had
  receive everything at least once. (Assuming there was no race condition
  between compaction and writing to the file)
 
 
  Has anybody got any thoughts?
 
  Will
 



Broker w/ high memory due to index file sizes

2015-02-18 Thread Zakee
I am running a cluster of 5 brokers with 40G ms/mx for each. I found one of
the brokers is constantly using above ~90% of memory for jvm.heapUsage. I
checked from lsof output that the size of the index files for this broker
is too large.

Not sure what is going on with this one broker in the cluster? Why would
the index file sizes be so hugely different on one broker? Any ideas?


Regards
Zakee

Invest with the Trend
Exclusive Breakout Alert On Soaring Social Media Technology
http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc

Re: Hold off on 0.8.2 upgrades

2015-02-18 Thread Jay Kreps
Well, I guess what I was thinking is that since we have the long timeout on
the vote anyway, no reason not to call the vote now, should anything else
pop up we can cancel the vote.

-Jay

On Wed, Feb 18, 2015 at 4:04 PM, Jun Rao j...@confluent.io wrote:

 Well, KAFKA-1952 only introduces high CPU overhead if the number of
 partitions in a fetch request is high, say more than a couple of hundreds.
 So, it may not show up in every installation. For example, if you have 1000
 leader replicas in a broker, but have a 20 node cluster, each replica fetch
 request is only going to include about 50 partitions. Since there is a bit
 of overhead running a release, I was hoping to collect some more feedback
 from people trying the 0.8.2.0 release who may not be affected by this
 issue. But I agree that we don't need to wait for too long.

 Thanks,

 Jun

 On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Does it make sense to wait, I don't think people will upgrade without the
  patched version and I think we should release it to unblock people.
 
  -Jay
 
  On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote:
 
   We have fixed the issue in KAFKA-1952. We will wait for a few more days
  to
   see if any new issue comes up. After that, we will do an 0.8.2.1
 release.
  
   Thanks,
  
   Jun
  
   On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
Hey all,
   
We found an issue in 0.8.2 that can lead to high CPU usage on brokers
   with
lots of partitions. We are working on a fix for this. You can track
progress here:
https://issues.apache.org/jira/browse/KAFKA-1952
   
I would recommend holding off on upgrading to 0.8.2 until we have a
 fix
   for
this issue. Sorry for the inconvenience.
   
-Jay
   
  
 



Kakfa question about starting kafka with same broker id

2015-02-18 Thread Deepak Dhakal
Hi,

My name is Deepak and I work for salesforce. We are using kafka 8.11  and
have a question about starting kafka with same broker id.

Steps:

Start a kakfa broker with broker id =1 - it starts fine with external ZK
Start another kafka with same broker id =1 .. it does not start the kafka
which is expected but I am seeing the following log and it keeps retrying
forever.

Is there way to control how many time a broker tries to starts itself with
the same broker id ?


Thanks
Deepak

[2015-02-18 14:47:20,713] INFO conflict in /controller data:
{version:1,brokerid:19471,timestamp:1424299100135} stored data:
{version:1,brokerid:19471,timestamp:1424288444314}
(kafka.utils.ZkUtils$)

[2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node
[{version:1,brokerid:19471,timestamp:1424299100135}] at /controller
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)

[2015-02-18 14:47:30,719] INFO conflict in /controller data:
{version:1,brokerid:19471,timestamp:1424299100135} stored data:
{version:1,brokerid:19471,timestamp:1424288444314}
(kafka.utils.ZkUtils$)

[2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node
[{version:1,brokerid:19471,timestamp:1424299100135}] at /controller
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)


Re: [VOTE] 0.8.2.1 Candidate 1

2015-02-18 Thread Connie Yang
+1
On Feb 18, 2015 7:23 PM, Matt Narrell matt.narr...@gmail.com wrote:

 +1

  On Feb 18, 2015, at 7:56 PM, Jun Rao j...@confluent.io wrote:
 
  This is the first candidate for release of Apache Kafka 0.8.2.1. This
  only fixes one critical issue (KAFKA-1952) in 0.8.2.0.
 
  Release Notes for the 0.8.2.1 release
 
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html
 
  *** Please download, test and vote by Saturday, Feb 21, 7pm PT
 
  Kafka's KEYS file containing PGP keys we use to sign the release:
  http://kafka.apache.org/KEYS in addition to the md5, sha1
  and sha2 (SHA256) checksum.
 
  * Release artifacts to be voted upon (source and binary):
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/
 
  * Maven artifacts to be voted upon prior to release:
  https://repository.apache.org/content/groups/staging/
 
  * scala-doc
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/
 
  * java-doc
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/
 
  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
 
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6
 
  /***
 
  Thanks,
 
  Jun




Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Will Funnell
 Do you have to separate the snapshot from the normal update flow.

We are trying to avoid using another datasource if possible to have one
source of truth.

 I think what you are saying is that you want to create a snapshot from the
 Kafka topic but NOT do continual reads after that point. For example you
 might be creating a backup of the data to a file.

Yes, this is correct.

 I think there are two features we could add that would make this easier:
 1. Make the cleaner point configurable on a per-topic basis. This feature
 would allow you to control how long the full log is retained and when
 compaction can kick in. This would give a configurable SLA for the reader
 process to catch up.

That sounds like it could work, I think if you could schedule the
time/frequency which compaction occured, clients could be scheduled to run
in between.

 2. Make the log end offset available more easily in the consumer.

Was thinking something would need to be added in LogCleanerManager, in the
updateCheckpoints function. Where would be best to publish the information
to make it more easily available, or would you just expose the
offset-cleaner-checkpoint file as it is?
Is it right you would also need to know which offset-cleaner-checkpoint
entry related to each active partition?

 Isn't it sufficient to just repeat the check at the end after reading
 the log and repeat until you are truly done? At least for the purposes
 of a snapshot?

Yes, was looking at this initially, but as we have 100-150 writes per
second, it could be a while before there is a pause long enough to check it
has caught up. Even with the consumer timeout set to -1, it takes some time
to query the max offset values, which is still long enough for more
messages to arrive.



On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote:

  You are also correct and perceptive to notice that if you check the end
 of
  the log then begin consuming and read up to that point compaction may
 have
  already kicked in (if the reading takes a while) and hence you might have
  an incomplete snapshot.

 Isn't it sufficient to just repeat the check at the end after reading
 the log and repeat until you are truly done? At least for the purposes
 of a snapshot?

 On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
  If you catch up off a compacted topic and keep consuming then you will
  become consistent with the log.
 
  I think what you are saying is that you want to create a snapshot from
 the
  Kafka topic but NOT do continual reads after that point. For example you
  might be creating a backup of the data to a file.
 
  I agree that this isn't as easy as it could be. As you say the only
  solution we have is that timeout which doesn't differentiate between GC
  stall in your process and no more messages left so you would need to tune
  the timeout. This is admittedly kind of a hack.
 
  You are also correct and perceptive to notice that if you check the end
 of
  the log then begin consuming and read up to that point compaction may
 have
  already kicked in (if the reading takes a while) and hence you might have
  an incomplete snapshot.
 
  I think there are two features we could add that would make this easier:
  1. Make the cleaner point configurable on a per-topic basis. This feature
  would allow you to control how long the full log is retained and when
  compaction can kick in. This would give a configurable SLA for the reader
  process to catch up.
  2. Make the log end offset available more easily in the consumer.
 
  -Jay
 
 
 
  On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com
  wrote:
 
   We are currently using Kafka 0.8.1.1 with log compaction in order to
   provide streams of messages to our clients.
  
   As well as constantly consuming the stream, one of our use cases is to
   provide a snapshot, meaning the user will receive a copy of every
 message
   at least once.
  
   Each one of these messages represents an item of content in our system.
  
  
   The problem comes when determining if the client has actually reached
 the
   end of the topic.
  
   The standard Kafka way of dealing with this seems to be by using a
   ConsumerTimeoutException, but we are frequently getting this error
 when the
   end of the topic has not been reached or even it may take a long time
   before a timeout naturally occurs.
  
  
   On first glance it would seem possible to do a lookup for the max
 offset
   for each partition when you begin consuming, stopping when this
 position it
   reached.
  
   But log compaction means that if an update to a piece of content
 arrives
   with the same message key, then this will be written to the end so the
   snapshot will be incomplete.
  
  
   Another thought is to make use of the cleaner point. Currently Kafka
 writes
   out to a cleaner-offset-checkpoint file in each data directory which
 is
   written to after log compaction completes.
  
   If the consumer was able to access the 

[VOTE] 0.8.2.1 Candidate 1

2015-02-18 Thread Jun Rao
This is the first candidate for release of Apache Kafka 0.8.2.1. This
only fixes one critical issue (KAFKA-1952) in 0.8.2.0.

Release Notes for the 0.8.2.1 release
https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html

*** Please download, test and vote by Saturday, Feb 21, 7pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS in addition to the md5, sha1
and sha2 (SHA256) checksum.

* Release artifacts to be voted upon (source and binary):
https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/

* Maven artifacts to be voted upon prior to release:
https://repository.apache.org/content/groups/staging/

* scala-doc
https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/

* java-doc
https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/

* The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6

/***

Thanks,

Jun


Re: Broker w/ high memory due to index file sizes

2015-02-18 Thread Jay Kreps
40G is really huge, generally you would want more like 4G. Are you sure you
need that? Not sure what you mean by lsof and index files being too large,
but the index files are memory mapped so they should be able to grow
arbitrarily large and their memory usage is not counted in the java heap
(in fact by having such a large heap you are taking away OS memory from
them).

-Jay

On Wed, Feb 18, 2015 at 4:13 PM, Zakee kzak...@netzero.net wrote:

 I am running a cluster of 5 brokers with 40G ms/mx for each. I found one of
 the brokers is constantly using above ~90% of memory for jvm.heapUsage. I
 checked from lsof output that the size of the index files for this broker
 is too large.

 Not sure what is going on with this one broker in the cluster? Why would
 the index file sizes be so hugely different on one broker? Any ideas?


 Regards
 Zakee
 
 Invest with the Trend
 Exclusive Breakout Alert On Soaring Social Media Technology
 http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc


Re: Hold off on 0.8.2 upgrades

2015-02-18 Thread Jun Rao
Well, KAFKA-1952 only introduces high CPU overhead if the number of
partitions in a fetch request is high, say more than a couple of hundreds.
So, it may not show up in every installation. For example, if you have 1000
leader replicas in a broker, but have a 20 node cluster, each replica fetch
request is only going to include about 50 partitions. Since there is a bit
of overhead running a release, I was hoping to collect some more feedback
from people trying the 0.8.2.0 release who may not be affected by this
issue. But I agree that we don't need to wait for too long.

Thanks,

Jun

On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Does it make sense to wait, I don't think people will upgrade without the
 patched version and I think we should release it to unblock people.

 -Jay

 On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote:

  We have fixed the issue in KAFKA-1952. We will wait for a few more days
 to
  see if any new issue comes up. After that, we will do an 0.8.2.1 release.
 
  Thanks,
 
  Jun
 
  On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey all,
  
   We found an issue in 0.8.2 that can lead to high CPU usage on brokers
  with
   lots of partitions. We are working on a fix for this. You can track
   progress here:
   https://issues.apache.org/jira/browse/KAFKA-1952
  
   I would recommend holding off on upgrading to 0.8.2 until we have a fix
  for
   this issue. Sorry for the inconvenience.
  
   -Jay
  
 



Re: Kakfa question about starting kafka with same broker id

2015-02-18 Thread Harsha
Deepak,
  You should getting following error and kafka server will shutdown
  itself it it sees same brokerid registered in zookeeper.
 Fatal error during KafkaServer startup. Prepare to shutdown
(kafka.server.KafkaServer)
java.lang.RuntimeException: A broker is already registered on the path
/brokers/ids/0. This probably indicates that you either have configured
a brokerid that is already in use, or else you have shutdown this broker
and restarted it faster than the zookeeper timeout so it appears to be
re-registering.

-Harsha

On Wed, Feb 18, 2015, at 03:16 PM, Deepak Dhakal wrote:
 Hi,
 
 My name is Deepak and I work for salesforce. We are using kafka 8.11  and
 have a question about starting kafka with same broker id.
 
 Steps:
 
 Start a kakfa broker with broker id =1 - it starts fine with external ZK
 Start another kafka with same broker id =1 .. it does not start the kafka
 which is expected but I am seeing the following log and it keeps retrying
 forever.
 
 Is there way to control how many time a broker tries to starts itself
 with
 the same broker id ?
 
 
 Thanks
 Deepak
 
 [2015-02-18 14:47:20,713] INFO conflict in /controller data:
 {version:1,brokerid:19471,timestamp:1424299100135} stored data:
 {version:1,brokerid:19471,timestamp:1424288444314}
 (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node
 [{version:1,brokerid:19471,timestamp:1424299100135}] at
 /controller
 a while back in a different session, hence I will backoff for this node
 to
 be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:30,719] INFO conflict in /controller data:
 {version:1,brokerid:19471,timestamp:1424299100135} stored data:
 {version:1,brokerid:19471,timestamp:1424288444314}
 (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node
 [{version:1,brokerid:19471,timestamp:1424299100135}] at
 /controller
 a while back in a different session, hence I will backoff for this node
 to
 be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)


Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Joel Koshy
  2. Make the log end offset available more easily in the consumer.
 
 Was thinking something would need to be added in LogCleanerManager, in the
 updateCheckpoints function. Where would be best to publish the information
 to make it more easily available, or would you just expose the
 offset-cleaner-checkpoint file as it is?
 Is it right you would also need to know which offset-cleaner-checkpoint
 entry related to each active partition?

I'm not sure if I misunderstood Jay's suggestion, but I think it is
along the lines of: we expose the log-end-offset (actually the high
watermark) of the partition in the fetch response. However, this is
not exposed to the consumer (either in the new ConsumerRecord class
or the existing MessageAndMetadata class). If we did, then if you
were to consume a record you can check that it has offsets up to the
log-end offset. If it does then you would know for sure that you have
consumed everything for that partition.

 Yes, was looking at this initially, but as we have 100-150 writes per
 second, it could be a while before there is a pause long enough to check it
 has caught up. Even with the consumer timeout set to -1, it takes some time
 to query the max offset values, which is still long enough for more
 messages to arrive.

Got it - thanks for clarifying.

 
 
 
 On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote:
 
   You are also correct and perceptive to notice that if you check the end
  of
   the log then begin consuming and read up to that point compaction may
  have
   already kicked in (if the reading takes a while) and hence you might have
   an incomplete snapshot.
 
  Isn't it sufficient to just repeat the check at the end after reading
  the log and repeat until you are truly done? At least for the purposes
  of a snapshot?
 
  On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
   If you catch up off a compacted topic and keep consuming then you will
   become consistent with the log.
  
   I think what you are saying is that you want to create a snapshot from
  the
   Kafka topic but NOT do continual reads after that point. For example you
   might be creating a backup of the data to a file.
  
   I agree that this isn't as easy as it could be. As you say the only
   solution we have is that timeout which doesn't differentiate between GC
   stall in your process and no more messages left so you would need to tune
   the timeout. This is admittedly kind of a hack.
  
   You are also correct and perceptive to notice that if you check the end
  of
   the log then begin consuming and read up to that point compaction may
  have
   already kicked in (if the reading takes a while) and hence you might have
   an incomplete snapshot.
  
   I think there are two features we could add that would make this easier:
   1. Make the cleaner point configurable on a per-topic basis. This feature
   would allow you to control how long the full log is retained and when
   compaction can kick in. This would give a configurable SLA for the reader
   process to catch up.
   2. Make the log end offset available more easily in the consumer.
  
   -Jay
  
  
  
   On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com
   wrote:
  
We are currently using Kafka 0.8.1.1 with log compaction in order to
provide streams of messages to our clients.
   
As well as constantly consuming the stream, one of our use cases is to
provide a snapshot, meaning the user will receive a copy of every
  message
at least once.
   
Each one of these messages represents an item of content in our system.
   
   
The problem comes when determining if the client has actually reached
  the
end of the topic.
   
The standard Kafka way of dealing with this seems to be by using a
ConsumerTimeoutException, but we are frequently getting this error
  when the
end of the topic has not been reached or even it may take a long time
before a timeout naturally occurs.
   
   
On first glance it would seem possible to do a lookup for the max
  offset
for each partition when you begin consuming, stopping when this
  position it
reached.
   
But log compaction means that if an update to a piece of content
  arrives
with the same message key, then this will be written to the end so the
snapshot will be incomplete.
   
   
Another thought is to make use of the cleaner point. Currently Kafka
  writes
out to a cleaner-offset-checkpoint file in each data directory which
  is
written to after log compaction completes.
   
If the consumer was able to access the cleaner-offset-checkpoint you
  would
be able to consume up to this point, check the point was still the
  same,
and compaction had not yet occurred, and therefore determine you had
receive everything at least once. (Assuming there was no race condition
between compaction and writing to the file)
   
   
Has anybody got any thoughts?
  

Re: Kakfa question about starting kafka with same broker id

2015-02-18 Thread Steve Morin
Why would you want to ever do that?

 On Feb 18, 2015, at 15:16, Deepak Dhakal ddha...@salesforce.com wrote:
 
 Hi,
 
 My name is Deepak and I work for salesforce. We are using kafka 8.11  and
 have a question about starting kafka with same broker id.
 
 Steps:
 
 Start a kakfa broker with broker id =1 - it starts fine with external ZK
 Start another kafka with same broker id =1 .. it does not start the kafka
 which is expected but I am seeing the following log and it keeps retrying
 forever.
 
 Is there way to control how many time a broker tries to starts itself with
 the same broker id ?
 
 
 Thanks
 Deepak
 
 [2015-02-18 14:47:20,713] INFO conflict in /controller data:
 {version:1,brokerid:19471,timestamp:1424299100135} stored data:
 {version:1,brokerid:19471,timestamp:1424288444314}
 (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:20,716] INFO I wrote this conflicted ephemeral node
 [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller
 a while back in a different session, hence I will backoff for this node to
 be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:30,719] INFO conflict in /controller data:
 {version:1,brokerid:19471,timestamp:1424299100135} stored data:
 {version:1,brokerid:19471,timestamp:1424288444314}
 (kafka.utils.ZkUtils$)
 
 [2015-02-18 14:47:30,722] INFO I wrote this conflicted ephemeral node
 [{version:1,brokerid:19471,timestamp:1424299100135}] at /controller
 a while back in a different session, hence I will backoff for this node to
 be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)


Re: [VOTE] 0.8.2.1 Candidate 1

2015-02-18 Thread Matt Narrell
+1

 On Feb 18, 2015, at 7:56 PM, Jun Rao j...@confluent.io wrote:
 
 This is the first candidate for release of Apache Kafka 0.8.2.1. This
 only fixes one critical issue (KAFKA-1952) in 0.8.2.0.
 
 Release Notes for the 0.8.2.1 release
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html
 
 *** Please download, test and vote by Saturday, Feb 21, 7pm PT
 
 Kafka's KEYS file containing PGP keys we use to sign the release:
 http://kafka.apache.org/KEYS in addition to the md5, sha1
 and sha2 (SHA256) checksum.
 
 * Release artifacts to be voted upon (source and binary):
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/
 
 * Maven artifacts to be voted upon prior to release:
 https://repository.apache.org/content/groups/staging/
 
 * scala-doc
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/
 
 * java-doc
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/
 
 * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6
 
 /***
 
 Thanks,
 
 Jun



Re: Hold off on 0.8.2 upgrades

2015-02-18 Thread Jun Rao
Yes, that makes sense. I will try to roll out 0.8.2.1 for vote later today.

Thanks,

Jun

On Wed, Feb 18, 2015 at 4:15 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Well, I guess what I was thinking is that since we have the long timeout on
 the vote anyway, no reason not to call the vote now, should anything else
 pop up we can cancel the vote.

 -Jay

 On Wed, Feb 18, 2015 at 4:04 PM, Jun Rao j...@confluent.io wrote:

  Well, KAFKA-1952 only introduces high CPU overhead if the number of
  partitions in a fetch request is high, say more than a couple of
 hundreds.
  So, it may not show up in every installation. For example, if you have
 1000
  leader replicas in a broker, but have a 20 node cluster, each replica
 fetch
  request is only going to include about 50 partitions. Since there is a
 bit
  of overhead running a release, I was hoping to collect some more feedback
  from people trying the 0.8.2.0 release who may not be affected by this
  issue. But I agree that we don't need to wait for too long.
 
  Thanks,
 
  Jun
 
  On Wed, Feb 18, 2015 at 2:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Does it make sense to wait, I don't think people will upgrade without
 the
   patched version and I think we should release it to unblock people.
  
   -Jay
  
   On Wed, Feb 18, 2015 at 1:43 PM, Jun Rao j...@confluent.io wrote:
  
We have fixed the issue in KAFKA-1952. We will wait for a few more
 days
   to
see if any new issue comes up. After that, we will do an 0.8.2.1
  release.
   
Thanks,
   
Jun
   
On Fri, Feb 13, 2015 at 3:28 PM, Jay Kreps jay.kr...@gmail.com
  wrote:
   
 Hey all,

 We found an issue in 0.8.2 that can lead to high CPU usage on
 brokers
with
 lots of partitions. We are working on a fix for this. You can track
 progress here:
 https://issues.apache.org/jira/browse/KAFKA-1952

 I would recommend holding off on upgrading to 0.8.2 until we have a
  fix
for
 this issue. Sorry for the inconvenience.

 -Jay

   
  
 



Re: Consuming a snapshot from log compacted topic

2015-02-18 Thread Jay Kreps
Yeah I was thinking either along the lines Joel was suggesting or else
adding a logEndOffset(TopicPartition) method or something like that. As
Joel says the consumer actually has this information internally (we return
it with the fetch request) but doesn't expose it.

-Jay

On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote:

   2. Make the log end offset available more easily in the consumer.
 
  Was thinking something would need to be added in LogCleanerManager, in
 the
  updateCheckpoints function. Where would be best to publish the
 information
  to make it more easily available, or would you just expose the
  offset-cleaner-checkpoint file as it is?
  Is it right you would also need to know which offset-cleaner-checkpoint
  entry related to each active partition?

 I'm not sure if I misunderstood Jay's suggestion, but I think it is
 along the lines of: we expose the log-end-offset (actually the high
 watermark) of the partition in the fetch response. However, this is
 not exposed to the consumer (either in the new ConsumerRecord class
 or the existing MessageAndMetadata class). If we did, then if you
 were to consume a record you can check that it has offsets up to the
 log-end offset. If it does then you would know for sure that you have
 consumed everything for that partition.

  Yes, was looking at this initially, but as we have 100-150 writes per
  second, it could be a while before there is a pause long enough to check
 it
  has caught up. Even with the consumer timeout set to -1, it takes some
 time
  to query the max offset values, which is still long enough for more
  messages to arrive.

 Got it - thanks for clarifying.

 
 
 
  On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote:
 
You are also correct and perceptive to notice that if you check the
 end
   of
the log then begin consuming and read up to that point compaction may
   have
already kicked in (if the reading takes a while) and hence you might
 have
an incomplete snapshot.
  
   Isn't it sufficient to just repeat the check at the end after reading
   the log and repeat until you are truly done? At least for the purposes
   of a snapshot?
  
   On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote:
If you catch up off a compacted topic and keep consuming then you
 will
become consistent with the log.
   
I think what you are saying is that you want to create a snapshot
 from
   the
Kafka topic but NOT do continual reads after that point. For example
 you
might be creating a backup of the data to a file.
   
I agree that this isn't as easy as it could be. As you say the only
solution we have is that timeout which doesn't differentiate between
 GC
stall in your process and no more messages left so you would need to
 tune
the timeout. This is admittedly kind of a hack.
   
You are also correct and perceptive to notice that if you check the
 end
   of
the log then begin consuming and read up to that point compaction may
   have
already kicked in (if the reading takes a while) and hence you might
 have
an incomplete snapshot.
   
I think there are two features we could add that would make this
 easier:
1. Make the cleaner point configurable on a per-topic basis. This
 feature
would allow you to control how long the full log is retained and when
compaction can kick in. This would give a configurable SLA for the
 reader
process to catch up.
2. Make the log end offset available more easily in the consumer.
   
-Jay
   
   
   
On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell 
 w.f.funn...@gmail.com
wrote:
   
 We are currently using Kafka 0.8.1.1 with log compaction in order
 to
 provide streams of messages to our clients.

 As well as constantly consuming the stream, one of our use cases
 is to
 provide a snapshot, meaning the user will receive a copy of every
   message
 at least once.

 Each one of these messages represents an item of content in our
 system.


 The problem comes when determining if the client has actually
 reached
   the
 end of the topic.

 The standard Kafka way of dealing with this seems to be by using a
 ConsumerTimeoutException, but we are frequently getting this error
   when the
 end of the topic has not been reached or even it may take a long
 time
 before a timeout naturally occurs.


 On first glance it would seem possible to do a lookup for the max
   offset
 for each partition when you begin consuming, stopping when this
   position it
 reached.

 But log compaction means that if an update to a piece of content
   arrives
 with the same message key, then this will be written to the end so
 the
 snapshot will be incomplete.


 Another thought is to make use of the cleaner point. Currently
 Kafka
   writes
 out to a cleaner-offset-checkpoint file in each