Re: Versioning Schema's
Thanks Jun Phil! Shone On Thu, Jun 13, 2013 at 12:00 AM, Jun Rao jun...@gmail.com wrote: Yes, we just have customized encoder that encodes the first 4 bytes of md5 of the schema, followed by Avro bytes. Thanks, Jun On Wed, Jun 12, 2013 at 9:50 AM, Shone Sadler shone.sad...@gmail.com wrote: Jun, I like the idea of an explicit version field, if the schema can be derived from the topic name itself. The storage (say 1-4 bytes) would require less overhead than a 128 bit md5 at the added cost of managing the version#. Is it correct to assume that your applications are using two schemas then, one system level schema to deserialize the schema id and bytes for the application message and a second schema to deserialize those bytes with the application schema? Thanks again! Shone On Wed, Jun 12, 2013 at 11:31 AM, Jun Rao jun...@gmail.com wrote: Actually, currently our schema id is the md5 of the schema itself. Not fully sure how this compares with an explicit version field in the schema. Thanks, Jun On Wed, Jun 12, 2013 at 8:29 AM, Jun Rao jun...@gmail.com wrote: At LinkedIn, we are using option 2. Thanks, Jun On Wed, Jun 12, 2013 at 7:14 AM, Shone Sadler shone.sad...@gmail.com wrote: Hello everyone, After doing some searching on the mailing list for best practices on integrating Avro with Kafka there appears to be at least 3 options for integrating the Avro Schema; 1) embedding the entire schema within the message 2) embedding a unique identifier for the schema in the message and 3) deriving the schema from the topic/resource name. Option 2, appears to be the best option in terms of both efficiency and flexibility. However, from a programming perspective it complicates the solution with the need for both an envelope schema (containing a schema id and bytes field for record data) and message schema (containing the application specific message fields). This requires two levels of serialization/deserialization. Questions: 1) How are others dealing with versioning of schemas? 2) Is there a more elegant means of embedding a schema ids in a Avro message (I am new to both currently ;-)? Thanks in advance! Shone
Re: Producer will pick one of the two brokers, but never the two at same time [0.8]
Hi Jun, I was using the 0.8 branch with 2 commits behind but now I am using the latest with the same issue. 3 topics A,B,C, created automatically with replication factor of 2 and partitions 2. 2 brokers (0 and 1). List of topics in zookeeper is the following: topic: A partition: 0leader: 1 replicas: 1,0 isr: 1 topic: A partition: 1leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 0leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 1leader: 1 replicas: 1,0 isr: 1 topic: C partition: 0leader: 1 replicas: 1,0 isr: 1 topic: C partition: 1leader: 0 replicas: 0,1 isr: 0,1 *Broker 1* This was the one I've started first. This works well and writes messages to the disk. In the state-change.log I have got no errors, just trace rows: [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) for partition [C,1] in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger) [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response correlationId 10 for a request sent to broker 1 (state.change.logger) [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,1] to OnlineReplica (state.change.logger) $ du -sh /mnt/kafka-logs/* 4.0K/mnt/kafka-logs/replication-offset-checkpoint 163M/mnt/kafka-logs/A-0 4.0K/mnt/kafka-logs/A-1 4.0K/mnt/kafka-logs/B-0 90M /mnt/kafka-logs/B-1 16K /mnt/kafka-logs/C-0 4.0K/mnt/kafka-logs/C-1 *Broker 0* * * Configuration is the same as Broker #1, with different broker.id. This doesn't write to the disk. The /mnt/kafka-logs is empty without any file. Logging a non-stopping stream of: [2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with correlation id 735114 from client on partition [A,1] failed due to Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 519064 from client on partition [B,0] failed due to Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 735118 from client on partition [A,1] failed due to Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 519068 from client on partition [B,0] failed due to Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis) ... *Server Configuration * * * port=9092 num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dir=/mnt/kafka-logs auto.create.topics.enable=true default.replication.factor=2 num.partitions=2 log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.hours=168 log.segment.bytes=536870912 log.cleanup.interval.mins=1 zookeeper.connect=xxx1:2181,xxx2:2181,xxx3:2181 zookeeper.connection.timeout.ms=100 kafka.metrics.polling.interval.secs=5 kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter kafka.csv.metrics.dir=/mnt/kafka_metrics kafka.csv.metrics.reporter.enabled=false I can't understand why doesn't broker0 doesn't act like a leader in their partitions nor receive replicated data from the broker1. To eliminate the possibility of the problem being from the producer, I will run similar tests with the console producer. Alex On 13 June 2013 04:57, Jun Rao jun...@gmail.com wrote: Any error in state-change.log? Also, are you using the latest code in the 0.8 branch? Thanks, Jun On Wed, Jun 12, 2013 at 9:27 AM, Alexandre Rodrigues alexan...@blismedia.com wrote: Hi Jun, Thanks for your prompt answer. The producer yields those errors in the beginning, so I think the topic metadata refresh has nothing to do with
0.8 Durability Question
Looking at Jun’s ApacheCon slides ( http://www.slideshare.net/junrao/kafka-replication-apachecon2013) slide 21 titled, ‘Data Flow in Replication’ there are three possible durability configurations which tradeoff latency for greater persistence guarantees. The third row is the ‘no data loss’ configuration option where the producer only receives an ack from the broker once the message(s) are committed by the leader and peers. Does this commit also mean the message(s) are flushed to disk? I know there is a separate configuration setting, log.flush.interval.messages, but I thought in sync mode the producer doesn’t receive an ack until message(s) are committed and flushed to disk. Please correct me if my understanding is incorrect. Thanks, Jonathan
Re: Producer will pick one of the two brokers, but never the two at same time [0.8]
I've tried the console producer, so I will assume that's not related with the producer. I keep seeing the same entries in the producer from time to time: [2013-06-13 11:04:00,670] WARN Error while fetching metadata [{TopicMetadata for topic C - No partition metadata for topic C due to kafka.common.LeaderNotAvailableException}] for topic [C]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) Which I assume is when the consumer asks a broker who is responsible for a partition. I might be wrong but I think one of the brokers doesn't know, so I thought it might be related with the ZK where partition leader elections happen (I think). I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the ZK nodes and started one without ensemble. Cleaned the brokers dataDir and restarted them against that solo ZK node. The problem still the same. I though it could be because of the ZK version, so I've decided to start a ZK instance using the jar that ships with Kafka and the problem remains. I am not sure if this is a real bug or just anything that might be missing to me. I don't know if it helps, but all the trials were run without any kind of consumer (which should be OK, no?) Thanks, Alex On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com wrote: Hi Jun, I was using the 0.8 branch with 2 commits behind but now I am using the latest with the same issue. 3 topics A,B,C, created automatically with replication factor of 2 and partitions 2. 2 brokers (0 and 1). List of topics in zookeeper is the following: topic: A partition: 0leader: 1 replicas: 1,0 isr: 1 topic: A partition: 1leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 0leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 1leader: 1 replicas: 1,0 isr: 1 topic: C partition: 0leader: 1 replicas: 1,0 isr: 1 topic: C partition: 1leader: 0 replicas: 0,1 isr: 0,1 *Broker 1* This was the one I've started first. This works well and writes messages to the disk. In the state-change.log I have got no errors, just trace rows: [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) for partition [C,1] in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger) [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response correlationId 10 for a request sent to broker 1 (state.change.logger) [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,1] to OnlineReplica (state.change.logger) $ du -sh /mnt/kafka-logs/* 4.0K/mnt/kafka-logs/replication-offset-checkpoint 163M/mnt/kafka-logs/A-0 4.0K/mnt/kafka-logs/A-1 4.0K/mnt/kafka-logs/B-0 90M /mnt/kafka-logs/B-1 16K /mnt/kafka-logs/C-0 4.0K/mnt/kafka-logs/C-1 *Broker 0* * * Configuration is the same as Broker #1, with different broker.id. This doesn't write to the disk. The /mnt/kafka-logs is empty without any file. Logging a non-stopping stream of: [2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with correlation id 735114 from client on partition [A,1] failed due to Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 519064 from client on partition [B,0] failed due to Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 735118 from client on partition [A,1] failed due to Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis) [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with correlation id 519068 from client on partition [B,0] failed due to Partition [response,0] doesn't exist on 0
Re: One 0.72 ConsumerConnector, multiple threads, 1 blocks. What happens?
Jun - thanks again. This is very helpful. Philip On Jun 12, 2013, at 9:50 PM, Jun Rao jun...@gmail.com wrote: Actually, you are right. This can happen on a single topic too, if you have more than one consumer thread. Each consumer thread pulls data from a blocking queue, one or more fetchers are putting data into the queue. Say, you have two consumer threads and two partitions from the same broker. There is a single fetcher that fetches both partitions and it will put one partition's data into a separate queue. So, if one thread stops consuming data, it's queue will be full at some point. This will block the fetcher from putting the data into the other queue. Thanks, Jun On Wed, Jun 12, 2013 at 9:10 PM, Philip O'Toole phi...@loggly.com wrote: Jun -- thanks. But if the topic is the same, doesn't each thread get a partition? Isn't that how it works? Philip On Wed, Jun 12, 2013 at 9:08 PM, Jun Rao jun...@gmail.com wrote: Yes, when the consumer is consuming multiple topics, if one thread stops consuming topic 1, it can prevent new data getting into the consumer for topic 2. Thanks, Jun On Wed, Jun 12, 2013 at 7:43 PM, Philip O'Toole phi...@loggly.com wrote: Hello -- we're using 0.72. We're looking at the source, but want to be sure. :-) We create a single ConsumerConnector, call createMessageStreams, and hand the streams off to individual threads. If one of those threads calls next() on a stream, gets some messages, and then *blocks* in some subsequent operation (and blocks for minutes), can it potentially cause all other threads (calling next() on other streams) to block too? Does something inside the ConsumerConnector block all other stream processing? This would explain some behaviour we're seeing. Thanks, Philip
Re: Producer will pick one of the two brokers, but never the two at same time [0.8]
I think I know what's happening: I tried to run both brokers and ZK on the same machine and it worked. I also attempted to do the same but with a ZK node on other machine and it also worked. My guess is something related with ports. All the machines are on EC2 and there might be something related with the security group. I am going to run the first setup with all open doors and see how it goes. If the ports are really the problem, shouldn't this kind of problem be logged somewhere? On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com wrote: I've tried the console producer, so I will assume that's not related with the producer. I keep seeing the same entries in the producer from time to time: [2013-06-13 11:04:00,670] WARN Error while fetching metadata [{TopicMetadata for topic C - No partition metadata for topic C due to kafka.common.LeaderNotAvailableException}] for topic [C]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) Which I assume is when the consumer asks a broker who is responsible for a partition. I might be wrong but I think one of the brokers doesn't know, so I thought it might be related with the ZK where partition leader elections happen (I think). I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the ZK nodes and started one without ensemble. Cleaned the brokers dataDir and restarted them against that solo ZK node. The problem still the same. I though it could be because of the ZK version, so I've decided to start a ZK instance using the jar that ships with Kafka and the problem remains. I am not sure if this is a real bug or just anything that might be missing to me. I don't know if it helps, but all the trials were run without any kind of consumer (which should be OK, no?) Thanks, Alex On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.comwrote: Hi Jun, I was using the 0.8 branch with 2 commits behind but now I am using the latest with the same issue. 3 topics A,B,C, created automatically with replication factor of 2 and partitions 2. 2 brokers (0 and 1). List of topics in zookeeper is the following: topic: A partition: 0leader: 1 replicas: 1,0 isr: 1 topic: A partition: 1leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 0leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 1leader: 1 replicas: 1,0 isr: 1 topic: C partition: 0leader: 1 replicas: 1,0 isr: 1 topic: C partition: 1leader: 0 replicas: 0,1 isr: 0,1 *Broker 1* This was the one I've started first. This works well and writes messages to the disk. In the state-change.log I have got no errors, just trace rows: [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) for partition [C,1] in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger) [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response correlationId 10 for a request sent to broker 1 (state.change.logger) [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,1] to OnlineReplica (state.change.logger) $ du -sh /mnt/kafka-logs/* 4.0K/mnt/kafka-logs/replication-offset-checkpoint 163M/mnt/kafka-logs/A-0 4.0K/mnt/kafka-logs/A-1 4.0K/mnt/kafka-logs/B-0 90M /mnt/kafka-logs/B-1 16K /mnt/kafka-logs/C-0 4.0K/mnt/kafka-logs/C-1 *Broker 0* * * Configuration is the same as Broker #1, with different broker.id. This doesn't write to the disk. The /mnt/kafka-logs is empty without any file. Logging a non-stopping stream of: [2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with correlation id 735114 from client on partition [A,1] failed due to Partition [request,1] doesn't exist on 0
Re: 0.8 Durability Question
No. It only means that messages are written to all replicas in memory. Data is flushed to disk asynchronously. Thanks, Neha On Jun 13, 2013 3:29 AM, Jonathan Hodges hodg...@gmail.com wrote: Looking at Jun’s ApacheCon slides ( http://www.slideshare.net/junrao/kafka-replication-apachecon2013) slide 21 titled, ‘Data Flow in Replication’ there are three possible durability configurations which tradeoff latency for greater persistence guarantees. The third row is the ‘no data loss’ configuration option where the producer only receives an ack from the broker once the message(s) are committed by the leader and peers. Does this commit also mean the message(s) are flushed to disk? I know there is a separate configuration setting, log.flush.interval.messages, but I thought in sync mode the producer doesn’t receive an ack until message(s) are committed and flushed to disk. Please correct me if my understanding is incorrect. Thanks, Jonathan
Re: Producer will pick one of the two brokers, but never the two at same time [0.8]
Have you looked at #3 in http://kafka.apache.org/faq.html? Thanks, Jun On Thu, Jun 13, 2013 at 6:41 AM, Alexandre Rodrigues alexan...@blismedia.com wrote: I think I know what's happening: I tried to run both brokers and ZK on the same machine and it worked. I also attempted to do the same but with a ZK node on other machine and it also worked. My guess is something related with ports. All the machines are on EC2 and there might be something related with the security group. I am going to run the first setup with all open doors and see how it goes. If the ports are really the problem, shouldn't this kind of problem be logged somewhere? On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com wrote: I've tried the console producer, so I will assume that's not related with the producer. I keep seeing the same entries in the producer from time to time: [2013-06-13 11:04:00,670] WARN Error while fetching metadata [{TopicMetadata for topic C - No partition metadata for topic C due to kafka.common.LeaderNotAvailableException}] for topic [C]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) Which I assume is when the consumer asks a broker who is responsible for a partition. I might be wrong but I think one of the brokers doesn't know, so I thought it might be related with the ZK where partition leader elections happen (I think). I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the ZK nodes and started one without ensemble. Cleaned the brokers dataDir and restarted them against that solo ZK node. The problem still the same. I though it could be because of the ZK version, so I've decided to start a ZK instance using the jar that ships with Kafka and the problem remains. I am not sure if this is a real bug or just anything that might be missing to me. I don't know if it helps, but all the trials were run without any kind of consumer (which should be OK, no?) Thanks, Alex On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com wrote: Hi Jun, I was using the 0.8 branch with 2 commits behind but now I am using the latest with the same issue. 3 topics A,B,C, created automatically with replication factor of 2 and partitions 2. 2 brokers (0 and 1). List of topics in zookeeper is the following: topic: A partition: 0leader: 1 replicas: 1,0 isr: 1 topic: A partition: 1leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 0leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 1leader: 1 replicas: 1,0 isr: 1 topic: C partition: 0leader: 1 replicas: 1,0 isr: 1 topic: C partition: 1leader: 0 replicas: 0,1 isr: 0,1 *Broker 1* This was the one I've started first. This works well and writes messages to the disk. In the state-change.log I have got no errors, just trace rows: [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) for partition [C,1] in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger) [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response correlationId 10 for a request sent to broker 1 (state.change.logger) [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,1] to OnlineReplica (state.change.logger) $ du -sh /mnt/kafka-logs/* 4.0K/mnt/kafka-logs/replication-offset-checkpoint 163M/mnt/kafka-logs/A-0 4.0K/mnt/kafka-logs/A-1 4.0K/mnt/kafka-logs/B-0 90M /mnt/kafka-logs/B-1 16K /mnt/kafka-logs/C-0 4.0K/mnt/kafka-logs/C-1 *Broker 0* * * Configuration is the same as Broker #1, with different broker.id. This doesn't write to the disk. The
Re: Arguments for Kafka over RabbitMQ ?
Hi all, First, thanks to Tim (from Rabbit) and Jonathan for moving this thread along. Jonathan, I hope you found my links to the data model docs, and Tim's replies, helpful. Has everyone got what they wanted from this thread? alexis On Tue, Jun 11, 2013 at 5:49 PM, Jonathan Hodges hodg...@gmail.com wrote: Hi Tim, While your comments regarding durability are accurate for 0.7 version of Kafka, it is a bit greyer with 0.8. In 0.8 you have the ability to configure Kafka to have the durability you need. This is what I was referring to with the link to Jun’s ApacheCon slides ( http://www.slideshare.net/junrao/kafka-replication-apachecon2013). If you look at slide 21 titled, ‘Data Flow in Replication’ you see the three possible durability configurations which tradeoff latency for greater persistence guarantees. The third row is the ‘no data loss’ configuration option where the producer only receives an ack from the broker once the message(s) are committed by the leader and peers (mirrors as you call them) and flushed to disk. This seems to be very similar to the scenario you describe in Rabbit, no? Jun or Neha can you please confirm my understanding of 0.8 durability is correct and there is no data loss in the scenario I describe? I know there is a separate configuration setting, log.flush.interval.messages, but I thought in sync mode the producer doesn’t receive an ack until message(s) are committed and flushed to disk. Please correct me if my understanding is incorrect. Thanks! On Tue, Jun 11, 2013 at 8:20 AM, Tim Watson watson.timo...@gmail.comwrote: Hi Jonathan, So, thanks for replying - that's all useful info. On 10 Jun 2013, at 14:19, Jonathan Hodges wrote: Kafka has a configurable rolling window of time it keeps the messages per topic. The default is 7 days and after this time the messages are removed from disk by the broker. Correct, the consumers maintain their own state via what are known as offsets. Also true that when producers/consumers contact the broker there is a random seek to the start of the offset, but the majority of access patterns are linear. So, just to be clear, the distinction that has been raised on this thread is only part of the story, viz the difference in rates between RabbitMQ and Kafka. Essentially, these two systems are performing completely different tasks, since in RabbitMQ, the concept of a long-term persistent topic whose entries are removed solely based on expiration policy is somewhat alien. RabbitMQ will delete messages from its message store as soon as a relevant consumer has seen and ACK'ed them, which *requires* tracking consumer state in the broker. I suspect this was your (earlier) point about Kafka /not/ trying to be a general purpose message broker, but having an architecture that is highly tuned to a specific set of usage patterns. As you can see in the last graph of 10 million messages which is less than a GB on disk, the Rabbit throughput is capped around 10k/sec. Beyond throughput, with the pending release of 0.8, Kafka will also have advantages around message guarantees and durability. [snip] Correct with 0.8 Kafka will have similar options like Rabbit fsync configuration option. Right, but just to be clear, unless Kafka starts to fsync for every single published message, you are /not/ going to offer the same guarantee. In this respect, rabbit is clearly putting safety above performance when that's what users ask it for, which is fine for some cases and not for others. By way of example, if you're using producer/publisher confirms with RabbitMQ, the broker will not ACK receipt of a message until (a) it has been fsync'ed to disk and (b) if the queue is mirrored, each mirror has acknowledged receipt of the message. Again, unless you're fsync-ing to disk on each publish, the guarantees will be different - and rightly so, since you can deal with re-publishing and de-duplication quite happily in a system that's dealing with a 7-day sliding window of data and thus ensuring throughput is more useful (in that case) than avoiding data loss on the server. Of course, architecturally, fsync-ing very regularly will kill the benefits that mmap combined with sendfile give you, since relying on the kernel's paging / caching capabilities is the whole point of doing that. That's not intended to be a criticism btw, just an observation about the distinction between the two system's differing approaches. Messages have always had ordering guarantees, but with 0.8 there is the notion of topic replicas similar to replication factor in Hadoop or Cassandra. http://www.slideshare.net/junrao/kafka-replication-apachecon2013 With configuration you can tradeoff latency for durability with 3 options. - Producer receives no acks (no network delay) - Producer waits for ack from broker leader (1 network roundtrip) - Producer waits for quorum ack (2 network
Re: Producer will pick one of the two brokers, but never the two at same time [0.8]
I have but this is a different thing. It's related with ports and security groups and not with the bind addresses. It's solved now. Thanks On 13 June 2013 15:42, Jun Rao jun...@gmail.com wrote: Have you looked at #3 in http://kafka.apache.org/faq.html? Thanks, Jun On Thu, Jun 13, 2013 at 6:41 AM, Alexandre Rodrigues alexan...@blismedia.com wrote: I think I know what's happening: I tried to run both brokers and ZK on the same machine and it worked. I also attempted to do the same but with a ZK node on other machine and it also worked. My guess is something related with ports. All the machines are on EC2 and there might be something related with the security group. I am going to run the first setup with all open doors and see how it goes. If the ports are really the problem, shouldn't this kind of problem be logged somewhere? On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com wrote: I've tried the console producer, so I will assume that's not related with the producer. I keep seeing the same entries in the producer from time to time: [2013-06-13 11:04:00,670] WARN Error while fetching metadata [{TopicMetadata for topic C - No partition metadata for topic C due to kafka.common.LeaderNotAvailableException}] for topic [C]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) Which I assume is when the consumer asks a broker who is responsible for a partition. I might be wrong but I think one of the brokers doesn't know, so I thought it might be related with the ZK where partition leader elections happen (I think). I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the ZK nodes and started one without ensemble. Cleaned the brokers dataDir and restarted them against that solo ZK node. The problem still the same. I though it could be because of the ZK version, so I've decided to start a ZK instance using the jar that ships with Kafka and the problem remains. I am not sure if this is a real bug or just anything that might be missing to me. I don't know if it helps, but all the trials were run without any kind of consumer (which should be OK, no?) Thanks, Alex On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com wrote: Hi Jun, I was using the 0.8 branch with 2 commits behind but now I am using the latest with the same issue. 3 topics A,B,C, created automatically with replication factor of 2 and partitions 2. 2 brokers (0 and 1). List of topics in zookeeper is the following: topic: A partition: 0leader: 1 replicas: 1,0 isr: 1 topic: A partition: 1leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 0leader: 0 replicas: 0,1 isr: 0,1 topic: B partition: 1leader: 1 replicas: 1,0 isr: 1 topic: C partition: 0leader: 1 replicas: 1,0 isr: 1 topic: C partition: 1leader: 0 replicas: 0,1 isr: 0,1 *Broker 1* This was the one I've started first. This works well and writes messages to the disk. In the state-change.log I have got no errors, just trace rows: [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1) for partition [C,1] in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 10 (state.change.logger) [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response correlationId 10 for a request sent to broker 1 (state.change.logger) [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [C,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 0 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,0] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [B,1] to OnlineReplica (state.change.logger) [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of replica 1 for partition [C,1] to OnlineReplica (state.change.logger) $ du -sh /mnt/kafka-logs/* 4.0K
Using Kafka for data messages
Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since Kafka has such a higher throughput than your typical MOM, this approach seems innovative. The benefits are: 1. If we start collecting more information about the users, we can simply start publishing that in new data messages and consumers can start processing those messages whenever they want. If we were doing this in a more traditional SOA approach the schemas would need to change every time we added a field but with this approach we can just create new messages without touching existing ones. 2. We are looking to make our systems smaller so if we end up with more, smaller systems that each publish a small number of events, it becomes easier to make changes and test the changes. If we were doing this in a more traditional SOA approach we would need to retest each consumer every time we changed our bigger SOA services. The downside appears to be: 1. We may be publishing a large amount of data that never gets used but that everyone needs to consume to see if they need it before discarding it. 2. The Product Recommendation system may need to wait until it consumes a number of messages and keep track of all the data internally before it can start processing. 3. While we may be able to keep the messages somewhat small, the fact that they contain data will mean they will be bigger than your tradition EDA messages. 4. It seems like we can do a lot of this using SOA (we already have an ESB than can do transformations to address consumers expecting an older version of the data). Any insight is appreciated. Thanks, Josh
Re: Arguments for Kafka over RabbitMQ ?
Hi Alexis, This was very helpful and I also appreciate both yours and Tim's input here. It clears up the cases for when to use Rabbit or Kafka. What is great is they are both open source with vibrant communities behind them. -Jonathan Go On Jun 13, 2013 8:45 AM, Alexis Richardson ale...@rabbitmq.com wrote: Hi all, First, thanks to Tim (from Rabbit) and Jonathan for moving this thread along. Jonathan, I hope you found my links to the data model docs, and Tim's replies, helpful. Has everyone got what they wanted from this thread? alexis On Tue, Jun 11, 2013 at 5:49 PM, Jonathan Hodges hodg...@gmail.com wrote: Hi Tim, While your comments regarding durability are accurate for 0.7 version of Kafka, it is a bit greyer with 0.8. In 0.8 you have the ability to configure Kafka to have the durability you need. This is what I was referring to with the link to Jun’s ApacheCon slides ( http://www.slideshare.net/junrao/kafka-replication-apachecon2013). If you look at slide 21 titled, ‘Data Flow in Replication’ you see the three possible durability configurations which tradeoff latency for greater persistence guarantees. The third row is the ‘no data loss’ configuration option where the producer only receives an ack from the broker once the message(s) are committed by the leader and peers (mirrors as you call them) and flushed to disk. This seems to be very similar to the scenario you describe in Rabbit, no? Jun or Neha can you please confirm my understanding of 0.8 durability is correct and there is no data loss in the scenario I describe? I know there is a separate configuration setting, log.flush.interval.messages, but I thought in sync mode the producer doesn’t receive an ack until message(s) are committed and flushed to disk. Please correct me if my understanding is incorrect. Thanks! On Tue, Jun 11, 2013 at 8:20 AM, Tim Watson watson.timo...@gmail.com wrote: Hi Jonathan, So, thanks for replying - that's all useful info. On 10 Jun 2013, at 14:19, Jonathan Hodges wrote: Kafka has a configurable rolling window of time it keeps the messages per topic. The default is 7 days and after this time the messages are removed from disk by the broker. Correct, the consumers maintain their own state via what are known as offsets. Also true that when producers/consumers contact the broker there is a random seek to the start of the offset, but the majority of access patterns are linear. So, just to be clear, the distinction that has been raised on this thread is only part of the story, viz the difference in rates between RabbitMQ and Kafka. Essentially, these two systems are performing completely different tasks, since in RabbitMQ, the concept of a long-term persistent topic whose entries are removed solely based on expiration policy is somewhat alien. RabbitMQ will delete messages from its message store as soon as a relevant consumer has seen and ACK'ed them, which *requires* tracking consumer state in the broker. I suspect this was your (earlier) point about Kafka /not/ trying to be a general purpose message broker, but having an architecture that is highly tuned to a specific set of usage patterns. As you can see in the last graph of 10 million messages which is less than a GB on disk, the Rabbit throughput is capped around 10k/sec. Beyond throughput, with the pending release of 0.8, Kafka will also have advantages around message guarantees and durability. [snip] Correct with 0.8 Kafka will have similar options like Rabbit fsync configuration option. Right, but just to be clear, unless Kafka starts to fsync for every single published message, you are /not/ going to offer the same guarantee. In this respect, rabbit is clearly putting safety above performance when that's what users ask it for, which is fine for some cases and not for others. By way of example, if you're using producer/publisher confirms with RabbitMQ, the broker will not ACK receipt of a message until (a) it has been fsync'ed to disk and (b) if the queue is mirrored, each mirror has acknowledged receipt of the message. Again, unless you're fsync-ing to disk on each publish, the guarantees will be different - and rightly so, since you can deal with re-publishing and de-duplication quite happily in a system that's dealing with a 7-day sliding window of data and thus ensuring throughput is more useful (in that case) than avoiding data loss on the server. Of course, architecturally, fsync-ing very regularly will kill the benefits that mmap combined with sendfile give you, since relying on the kernel's paging / caching capabilities is the whole point of doing that. That's not intended to be a criticism btw, just an observation about the distinction between the two system's differing approaches. Messages have always had ordering
RE: shipping logs to s3 or other servers for backups
Hi, In my application, I am storing user events, and I want to partition the storage by day. So at the end of a day, I want to take that file and ship it to s3 or another server as a backup. This way I can replay the events for a specific day if needed. These events also have to be in order. How should I structure things in kafka to ensure that these user events all belong to the same file (or set of files if the file gets large I believe kafka splits it into multiple files).
Re: Using Kafka for data messages
Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since Kafka has such a higher throughput than your typical MOM, this approach seems innovative. The benefits are: 1. If we start collecting more information about the users, we can simply start publishing that in new data messages and consumers can start processing those messages whenever they want. If we were doing this in a more traditional SOA approach the schemas would need to change every time we added a field but with this approach we can just create new messages without touching existing ones. 2. We are looking to make our systems smaller so if we end up with more, smaller systems that each publish a small number of events, it becomes easier to make changes and test the changes. If we were doing this in a more traditional SOA approach we would need to retest each consumer every time we changed our bigger SOA services. The downside appears to be: 1. We may be publishing a large amount of data that never gets used but that everyone needs to consume to see if they need it before discarding it. 2. The Product Recommendation system may need to wait until it consumes a number of messages and keep track of all the data internally before it can start processing. 3. While we may be able to keep the messages somewhat small, the fact that they contain data will mean they will be bigger than your tradition EDA messages. 4. It seems like we can do a lot of this using SOA (we already have an ESB than can do transformations to address consumers expecting an older version of the data). Any insight is appreciated. Thanks, Josh -- Mahendra http://twitter.com/mahendra
Re: Producer only finding partition on 1 of 2 Brokers, even though ZK shows 1 partition exists on both Brokers?
As an update, this continues to affect us. First I'd like to note ways in which my issues seems different than KAFKA-278, * I did not add a new broker or a new topic, this topic has been in use on two existing brokers for months * The topic definitely exists on both brokers. The topic/data directory exists on both, and as noted above both brokers even show it in ZK That said, I went ahead and did the work around from the ticket, which in my case basically means restart the brokers (because the topic/data directory already exists). After bouncing both brokers I *did* get data to both brokers for a while. I'm not yet sure if this only lasts until I have to restart my *producers* (as I've had to update them a bit lately), but that is my current guess. When I start a producer now (both brokers up, data looks exactly like ZK in original post), I get output like this: 2013-06-13_15:46:11.64496 Broker Topic Path = /brokers/topics 2013-06-13_15:46:11.0 15:46:11.999 [run-main] INFO kafka.producer.ProducerPool - Creating async producer for broker id = 1 at 10.10.150.16:9092 2013-06-13_15:46:12.00109 15:46:12.001 [run-main] INFO kafka.producer.ProducerPool - Creating async producer for broker id = 0 at 10.10.71.113:9092 2013-06-13_15:46:16.77956 15:46:16.779 [ProducerSendThread-1375847990] INFO kafka.producer.SyncProducer - Connected to 10.10.150.16:9092 for producing The last line repeats as the SyncProducer does its periodic reconnect thing, but note that it is *always* broker 1 at 10.10.150.16:9092, even though it seems like broker 0 is seen. Thanks for your help! On Tue, Jun 4, 2013 at 6:24 PM, Neha Narkhede neha.narkh...@gmail.comwrote: You are probably hitting https://issues.apache.org/jira/browse/KAFKA-278. Can you please try the workaround mentioned in the JIRA description? Thanks, Neha On Tue, Jun 4, 2013 at 4:21 PM, Brett Hoerner br...@bretthoerner.com wrote: (version 0.7.2) For some reason, my producers are only picking up the partition on 1 of my 2 brokers. I've been digging through the code, and I don't see any issues given the state of my ZK nodes. The producer never seems to locate a partition on Broker0, even though ZK clearly states that it has 1 (just like Broker1 does). The result of this is that the producer works, but only sends data to one Broker. Has anyone seen something like this before? I'm stumped. Thanks. # zk information follows: [zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids [1, 0] [zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/ids/0 10.10.71.113-1365733477001:10.10.71.113:9092 [zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/ids/1 10.10.150.16-1369236663861:10.10.150.16:9092 [zk: localhost:2181(CONNECTED) 10] ls /kafka/brokers/topics/test [1, 0] # this is the most confusing one [zk: localhost:2181(CONNECTED) 11] get /kafka/brokers/topics/test/0 1 [zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/test/1 1 # kafka producer ZK information DEBUG log, as you can see it finds 0 partitions on Broker0: Broker Topic Path = /brokers/topics DEBUG [2013-06-04 23:14:30,689] kafka.producer.ZKBrokerPartitionInfo: Broker ids and # of partitions on each for topic: test = ArrayBuffer((0,0), (1,1)) DEBUG [2013-06-04 23:14:30,690] kafka.producer.ZKBrokerPartitionInfo: Sorted list of broker ids and partition ids on each for topic: test = TreeSet(1-0) DEBUG [2013-06-04 23:14:30,819] kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener: [BrokerTopicsListener] Creating broker topics listener to watch the following paths - /broker/topics, /broker/topics/topic, /broker/ids DEBUG [2013-06-04 23:14:30,823] kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener: [BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to partition id per topic with Map(test - TreeSet(1-0)) DEBUG [2013-06-04 23:14:30,904] kafka.producer.ZKBrokerPartitionInfo: Registering listener on path: /brokers/topics/test
Re: Using Kafka for data messages
Hi Mahendra, I think that is where it gets a little tricky. I think it would work something like this: 1. Web sends login event for user user123 to topic GUEST_EVENT. 2. All of the systems consume those messages and publish the data messages to topic GUEST_DATA.user123. 3. The Recommendation system gets all of the data from GUEST_DATA.user123, processes and then publishes back to the same topic GUEST_DATA.user123. 4. The Web consumes the messages from the same topic (there is a different topic for every user that logged in) GUEST_DATA.user123 and when it finds the recommendation messages it pushes that to the browser (note it will need to read all the other data messages and discard those when looking for the recommendation messages). I have a concern that the Web will be flooded with a ton of messages that it will promptly drop but I don't want to create a new response or recommendation topic because then I feel like I am tightly coupling the message to the functionality and in the future different systems may want to consume those messages as well. Does that make sense? Josh From: Mahendra M mahendr...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 12:56 PM Subject: Re: Using Kafka for data messages Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since Kafka has such a higher throughput than your typical MOM, this approach seems innovative. The benefits are: 1. If we start collecting more information about the users, we can simply start publishing that in new data messages and consumers can start processing those messages whenever they want. If we were doing this in a more traditional SOA approach the schemas would need to change every time we added a field but with this approach we can just create new messages without touching existing ones. 2. We are looking to make our systems smaller so if we end up with more, smaller systems that each publish a small number of events, it becomes easier to make changes and test the changes. If we were doing this in a more traditional SOA approach we would need to retest each consumer every time we changed our bigger SOA services. The downside appears to be: 1. We may be publishing a large amount of data that never gets used but that everyone needs to consume to see if they need it before discarding it. 2. The Product Recommendation system may need to wait until it consumes a number of messages and keep track of all the data internally before it can start processing. 3. While we may be able to keep the messages somewhat small, the fact that they contain data will mean they will be bigger than your tradition EDA messages. 4. It seems like we can do a lot of this using SOA (we already have an ESB than can do transformations to address consumers expecting an older version of the data). Any insight is appreciated. Thanks, Josh -- Mahendra http://twitter.com/mahendra
Re: Using Kafka for data messages
Also since you're going to be creating a topic per user, the number of concurrent users will also be a concern to Kafka as it doesn't like massive amounts of topics. Tim On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote: Hi Mahendra, I think that is where it gets a little tricky. I think it would work something like this: 1. Web sends login event for user user123 to topic GUEST_EVENT. 2. All of the systems consume those messages and publish the data messages to topic GUEST_DATA.user123. 3. The Recommendation system gets all of the data from GUEST_DATA.user123, processes and then publishes back to the same topic GUEST_DATA.user123. 4. The Web consumes the messages from the same topic (there is a different topic for every user that logged in) GUEST_DATA.user123 and when it finds the recommendation messages it pushes that to the browser (note it will need to read all the other data messages and discard those when looking for the recommendation messages). I have a concern that the Web will be flooded with a ton of messages that it will promptly drop but I don't want to create a new response or recommendation topic because then I feel like I am tightly coupling the message to the functionality and in the future different systems may want to consume those messages as well. Does that make sense? Josh From: Mahendra M mahendr...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 12:56 PM Subject: Re: Using Kafka for data messages Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since Kafka has such a higher throughput than your typical MOM, this approach seems innovative. The benefits are: 1. If we start collecting more information about the users, we can simply start publishing that in new data messages and consumers can start processing those messages whenever they want. If we were doing this in a more traditional SOA approach the schemas would need to change every time we added a field but with this approach we can just create new messages without touching existing ones. 2. We are looking to make our systems smaller so if we end up with more, smaller systems that each publish a small number of events, it becomes easier to make changes and test the changes. If we were doing this in a more traditional SOA approach we would need to retest each consumer every time we changed our bigger SOA services. The downside appears to be: 1. We may be publishing a large amount of data that never gets used but that everyone needs to consume to see if they need it before discarding it. 2. The Product Recommendation system may need to wait until it consumes a number of messages and keep track of all the data internally before it can start processing. 3. While we may be able to keep the messages somewhat small, the fact that they contain data will mean they will be bigger than your tradition EDA messages. 4. It seems like we
Re: Using Kafka for data messages
Ah yes, I had read that Kafka likes under 1,000 topics but I wasn't sure if that was really a limitation. In principle I wouldn't mind having all guest events placed on the GUEST_DATA queue but I thought that by having more topics I could minimize having consumers read messages only to discard them. My thought had been that if I have 20 Web JVM and at any given time I have 1,000 people logged in per JVM, each JVM would only need to consume the messages from 1,000 topics. If instead there is a single topic, each JVM will be consuming from the same topic (and be in different consumer groups) but 19 out of 20 messages will be for guests that are not even logged into that JVM. Since Kafka doesn't have message selectors or anything like that I was hoping to use topics to help segregate the traffic. I don't want to use 1 topic per Web JVM because in the future other consumers may be interested in that same data and the services that put the data in Kafka shouldn't have to lookup what JVM that user is logged into (or get that from another message and keep track of it). Any thoughts on how to work around this? I know there are topic partitions but that seems more like a way to distribute the workload in terms of storing the messages and not for the message selection scenario I am describing if I understood correctly. From: Timothy Chen tnac...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 2:13 PM Subject: Re: Using Kafka for data messages Also since you're going to be creating a topic per user, the number of concurrent users will also be a concern to Kafka as it doesn't like massive amounts of topics. Tim On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote: Hi Mahendra, I think that is where it gets a little tricky. I think it would work something like this: 1. Web sends login event for user user123 to topic GUEST_EVENT. 2. All of the systems consume those messages and publish the data messages to topic GUEST_DATA.user123. 3. The Recommendation system gets all of the data from GUEST_DATA.user123, processes and then publishes back to the same topic GUEST_DATA.user123. 4. The Web consumes the messages from the same topic (there is a different topic for every user that logged in) GUEST_DATA.user123 and when it finds the recommendation messages it pushes that to the browser (note it will need to read all the other data messages and discard those when looking for the recommendation messages). I have a concern that the Web will be flooded with a ton of messages that it will promptly drop but I don't want to create a new response or recommendation topic because then I feel like I am tightly coupling the message to the functionality and in the future different systems may want to consume those messages as well. Does that make sense? Josh From: Mahendra M mahendr...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 12:56 PM Subject: Re: Using Kafka for data messages Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation
Re: Producer only finding partition on 1 of 2 Brokers, even though ZK shows 1 partition exists on both Brokers?
You know what, it's likely this is all because I'm running a bad fork of Kafka 0.7.2 for Scala 2.10 (on the producers/consumers) since that's the version we've standardized on. Behavior in 2.9.2 with the official Kafka 0.7.2 release seems much more normal -- I'm working on downgrading all our clients and I'll report back. Sorry for the spam. On Thu, Jun 13, 2013 at 12:31 PM, Brett Hoerner br...@bretthoerner.comwrote: As an update, this continues to affect us. First I'd like to note ways in which my issues seems different than KAFKA-278, * I did not add a new broker or a new topic, this topic has been in use on two existing brokers for months * The topic definitely exists on both brokers. The topic/data directory exists on both, and as noted above both brokers even show it in ZK That said, I went ahead and did the work around from the ticket, which in my case basically means restart the brokers (because the topic/data directory already exists). After bouncing both brokers I *did* get data to both brokers for a while. I'm not yet sure if this only lasts until I have to restart my *producers* (as I've had to update them a bit lately), but that is my current guess. When I start a producer now (both brokers up, data looks exactly like ZK in original post), I get output like this: 2013-06-13_15:46:11.64496 Broker Topic Path = /brokers/topics 2013-06-13_15:46:11.0 15:46:11.999 [run-main] INFO kafka.producer.ProducerPool - Creating async producer for broker id = 1 at 10.10.150.16:9092 2013-06-13_15:46:12.00109 15:46:12.001 [run-main] INFO kafka.producer.ProducerPool - Creating async producer for broker id = 0 at 10.10.71.113:9092 2013-06-13_15:46:16.77956 15:46:16.779 [ProducerSendThread-1375847990] INFO kafka.producer.SyncProducer - Connected to 10.10.150.16:9092 for producing The last line repeats as the SyncProducer does its periodic reconnect thing, but note that it is *always* broker 1 at 10.10.150.16:9092, even though it seems like broker 0 is seen. Thanks for your help! On Tue, Jun 4, 2013 at 6:24 PM, Neha Narkhede neha.narkh...@gmail.comwrote: You are probably hitting https://issues.apache.org/jira/browse/KAFKA-278. Can you please try the workaround mentioned in the JIRA description? Thanks, Neha On Tue, Jun 4, 2013 at 4:21 PM, Brett Hoerner br...@bretthoerner.com wrote: (version 0.7.2) For some reason, my producers are only picking up the partition on 1 of my 2 brokers. I've been digging through the code, and I don't see any issues given the state of my ZK nodes. The producer never seems to locate a partition on Broker0, even though ZK clearly states that it has 1 (just like Broker1 does). The result of this is that the producer works, but only sends data to one Broker. Has anyone seen something like this before? I'm stumped. Thanks. # zk information follows: [zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids [1, 0] [zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/ids/0 10.10.71.113-1365733477001:10.10.71.113:9092 [zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/ids/1 10.10.150.16-1369236663861:10.10.150.16:9092 [zk: localhost:2181(CONNECTED) 10] ls /kafka/brokers/topics/test [1, 0] # this is the most confusing one [zk: localhost:2181(CONNECTED) 11] get /kafka/brokers/topics/test/0 1 [zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/test/1 1 # kafka producer ZK information DEBUG log, as you can see it finds 0 partitions on Broker0: Broker Topic Path = /brokers/topics DEBUG [2013-06-04 23:14:30,689] kafka.producer.ZKBrokerPartitionInfo: Broker ids and # of partitions on each for topic: test = ArrayBuffer((0,0), (1,1)) DEBUG [2013-06-04 23:14:30,690] kafka.producer.ZKBrokerPartitionInfo: Sorted list of broker ids and partition ids on each for topic: test = TreeSet(1-0) DEBUG [2013-06-04 23:14:30,819] kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener: [BrokerTopicsListener] Creating broker topics listener to watch the following paths - /broker/topics, /broker/topics/topic, /broker/ids DEBUG [2013-06-04 23:14:30,823] kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener: [BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to partition id per topic with Map(test - TreeSet(1-0)) DEBUG [2013-06-04 23:14:30,904] kafka.producer.ZKBrokerPartitionInfo: Registering listener on path: /brokers/topics/test
Re: Using Kafka for data messages
Spot on. This one was of the areas that we had to workaround. Remember that there is a 1:1 relationship of topics to directories and most file systems don't like 10s of thousands of directories. We found on practice that 60k per machine was a practical limit using I believe EXT3FS On Thursday, June 13, 2013, Timothy Chen wrote: Also since you're going to be creating a topic per user, the number of concurrent users will also be a concern to Kafka as it doesn't like massive amounts of topics. Tim On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote: Hi Mahendra, I think that is where it gets a little tricky. I think it would work something like this: 1. Web sends login event for user user123 to topic GUEST_EVENT. 2. All of the systems consume those messages and publish the data messages to topic GUEST_DATA.user123. 3. The Recommendation system gets all of the data from GUEST_DATA.user123, processes and then publishes back to the same topic GUEST_DATA.user123. 4. The Web consumes the messages from the same topic (there is a different topic for every user that logged in) GUEST_DATA.user123 and when it finds the recommendation messages it pushes that to the browser (note it will need to read all the other data messages and discard those when looking for the recommendation messages). I have a concern that the Web will be flooded with a ton of messages that it will promptly drop but I don't want to create a new response or recommendation topic because then I feel like I am tightly coupling the message to the functionality and in the future different systems may want to consume those messages as well. Does that make sense? Josh From: Mahendra M mahendr...@gmail.com To: users@kafka.apache.org; Josh Foure user...@yahoo.com Sent: Thursday, June 13, 2013 12:56 PM Subject: Re: Using Kafka for data messages Hi Josh, The idea looks very interesting. I just had one doubt. 1. A user logs in. His login id is sent on a topic 2. Other systems (consumers on this topic) consumer this message and publish their results to another topic This will be happening without any particular order for hundreds of users. Now the site being displayed to the user.. How will you fetch only messages for that user from the queue? Regards, Mahendra On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote: Hi all, my team is proposing a novel way of using Kafka and I am hoping someone can help do a sanity check on this: 1. When a user logs into our website, we will create a “logged in” event message in Kafka containing the user id. 2. 30+ systems (consumers each in their own consumer groups) will consume this event and lookup data about this user id. They will then publish all of this data back out into Kafka as a series of data messages. One message may include the user’s name, another the user’s address, another the user’s last 10 searches, another their last 10 orders, etc. The plan is that a single “logged in” event may trigger hundreds if not thousands of additional data messages. 3. Another system, the “Product Recommendation” system, will have consumed the original “logged in” message and will also consume a subset of the data messages (realistically I think it would need to consume all of the data messages but would discard the ones it doesn’t need). As the Product Recommendation consumes the data messages, it will process recommended products and publish out recommendation messages (that get more and more specific as it has consumed more and more data messages). 4. The original website will consume the recommendation messages and show the recommendations to the user as it gets them. You don’t see many systems implemented this way but since
Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?
Hello -- is it possible for our code to stall a ConsumerConnector from doing any consuming for, say, 30 seconds, until we can be sure that all other ConsumeConnectors are rebalanced? It seems that the first ConsumerConnector to come up is prefetching some data, and we end up with duplicate messages. We looked at the code for the high-level consumer (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala) and it looks like it empties some queues after a rebalance, but we still see duplicate messages. I'm sure this question has been asked before :-) but this is our first time really working with the high-level consumer, and this caught us by surprise. When there is *no* data in Kafka, wait until everything balances and then send data in everything works fine, but if there is some data sitting in the brokers, we seems to get dupes, even when each thread sleeps for many seconds after creating the ConsumerConnector. Are we missing something? Thanks, Philip
Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?
Just to be clear, I'm not asking that we solve duplicate messages on crash before commit to Zookeeper, just an apparent problem where if Kafka has some data, and we start on ConsumerConnectors, we get dupe data since some Consumers come up before others. Any help? Philip On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote: Hello -- is it possible for our code to stall a ConsumerConnector from doing any consuming for, say, 30 seconds, until we can be sure that all other ConsumeConnectors are rebalanced? It seems that the first ConsumerConnector to come up is prefetching some data, and we end up with duplicate messages. We looked at the code for the high-level consumer (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala) and it looks like it empties some queues after a rebalance, but we still see duplicate messages. I'm sure this question has been asked before :-) but this is our first time really working with the high-level consumer, and this caught us by surprise. When there is *no* data in Kafka, wait until everything balances and then send data in everything works fine, but if there is some data sitting in the brokers, we seems to get dupes, even when each thread sleeps for many seconds after creating the ConsumerConnector. Are we missing something? Thanks, Philip
Re: Kafka 0.8 Maven and IntelliJ
Thanks. Which version of Intellij are you using? Jun On Thu, Jun 13, 2013 at 10:20 AM, Dragos Manolescu dragos.manole...@servicenow.com wrote: Hmm, I've just pulled 0.8.0-beta1-candidate1, removed .idea* from my top-level directory, executed gen-idea, and then opened and built the project in IntelliJ w/o problems. I noticed that the build uses an old version of the sbt-idea plugin: addSbtPlugin(com.github.mpeltonen % sbt-idea % 1.2.0) The latest release is 1.4.0, perhaps upgrading would help? -Dragos On 6/12/13 9:03 PM, Jun Rao jun...@gmail.com wrote: Dragos, After the sbt upgrade 3-4 months ago, some of us are struggling to get the Kafka code cleanly loaded to Intellij after doing ./sbt gen-idea. Were you able to do that successfully? Thanks, Jun On Wed, Jun 12, 2013 at 10:45 AM, Dragos Manolescu dragos.manole...@servicenow.com wrote: For IntelliJ I've always used the gen-idea sbt plugin: https://github.com/mpeltonen/sbt-idea -Dragos On 6/11/13 10:41 PM, Jason Rosenberg j...@squareup.com wrote: Try the one under core/targets? On Tue, Jun 11, 2013 at 3:34 PM, Florin Trofin ftro...@adobe.com wrote: I downloaded the latest 0.8 snapshot and I want to build using Maven: ./sbt make-pom Generates a bunch of pom.xml files but when I try to open one of them in IntelliJ they are not recognized. Do I need to do any other step? Which pom do I need to open? Thanks! Florin
Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?
Are you messages compressed in batches? If so, some dups are expected during rebalance. In 0.8, such dups are eliminated. Other than that, rebalance shouldn't cause dups since we commit consumed offsets to ZK before doing a rebalance. Thanks, Jun On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote: Hello -- is it possible for our code to stall a ConsumerConnector from doing any consuming for, say, 30 seconds, until we can be sure that all other ConsumeConnectors are rebalanced? It seems that the first ConsumerConnector to come up is prefetching some data, and we end up with duplicate messages. We looked at the code for the high-level consumer (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala) and it looks like it empties some queues after a rebalance, but we still see duplicate messages. I'm sure this question has been asked before :-) but this is our first time really working with the high-level consumer, and this caught us by surprise. When there is *no* data in Kafka, wait until everything balances and then send data in everything works fine, but if there is some data sitting in the brokers, we seems to get dupes, even when each thread sleeps for many seconds after creating the ConsumerConnector. Are we missing something? Thanks, Philip
Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?
Jun -- thanks. We're using 0.72. No, the messages are not compressed, and since we do appear to be seeing dupes in our tests, it indicates our own code is buggy. Thanks, Philip On Thu, Jun 13, 2013 at 9:15 PM, Jun Rao jun...@gmail.com wrote: Are you messages compressed in batches? If so, some dups are expected during rebalance. In 0.8, such dups are eliminated. Other than that, rebalance shouldn't cause dups since we commit consumed offsets to ZK before doing a rebalance. Thanks, Jun On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote: Hello -- is it possible for our code to stall a ConsumerConnector from doing any consuming for, say, 30 seconds, until we can be sure that all other ConsumeConnectors are rebalanced? It seems that the first ConsumerConnector to come up is prefetching some data, and we end up with duplicate messages. We looked at the code for the high-level consumer (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala) and it looks like it empties some queues after a rebalance, but we still see duplicate messages. I'm sure this question has been asked before :-) but this is our first time really working with the high-level consumer, and this caught us by surprise. When there is *no* data in Kafka, wait until everything balances and then send data in everything works fine, but if there is some data sitting in the brokers, we seems to get dupes, even when each thread sleeps for many seconds after creating the ConsumerConnector. Are we missing something? Thanks, Philip