Re: Versioning Schema's

2013-06-13 Thread Shone Sadler
Thanks Jun  Phil!

Shone


On Thu, Jun 13, 2013 at 12:00 AM, Jun Rao jun...@gmail.com wrote:

 Yes, we just have customized encoder that encodes the first 4 bytes of md5
 of the schema, followed by Avro bytes.

 Thanks,

 Jun


 On Wed, Jun 12, 2013 at 9:50 AM, Shone Sadler shone.sad...@gmail.com
 wrote:

  Jun,
  I like the idea of an explicit version field, if the schema can be
 derived
  from the topic name itself. The storage (say 1-4 bytes) would require
 less
  overhead than a 128 bit md5 at the added cost of managing the version#.
 
  Is it correct to assume that your applications are using two schemas
 then,
  one system level schema to deserialize the schema id and bytes for the
  application message and a second schema to deserialize those bytes with
 the
  application schema?
 
  Thanks again!
  Shone
 
 
  On Wed, Jun 12, 2013 at 11:31 AM, Jun Rao jun...@gmail.com wrote:
 
   Actually, currently our schema id is the md5 of the schema itself. Not
   fully sure how this compares with an explicit version field in the
  schema.
  
   Thanks,
  
   Jun
  
  
   On Wed, Jun 12, 2013 at 8:29 AM, Jun Rao jun...@gmail.com wrote:
  
At LinkedIn, we are using option 2.
   
Thanks,
   
Jun
   
   
On Wed, Jun 12, 2013 at 7:14 AM, Shone Sadler 
 shone.sad...@gmail.com
   wrote:
   
Hello everyone,
   
After doing some searching on the mailing list for best practices on
integrating Avro with Kafka there appears to be at least 3 options
 for
integrating the Avro Schema; 1) embedding the entire schema within
 the
message 2) embedding a unique identifier for the schema in the
 message
   and
3) deriving the schema from the topic/resource name.
   
Option 2, appears to be the best option in terms of both efficiency
  and
flexibility.  However, from a programming perspective it complicates
  the
solution with the need for both an envelope schema (containing a
  schema
id and bytes field for record data) and message schema
 (containing
   the
application specific message fields).  This requires two levels of
serialization/deserialization.
Questions:
1) How are others dealing with versioning of schemas?
2) Is there a more elegant means of embedding a schema ids in a Avro
message (I am new to both currently ;-)?
   
Thanks in advance!
   
Shone
   
   
   
  
 



Re: Producer will pick one of the two brokers, but never the two at same time [0.8]

2013-06-13 Thread Alexandre Rodrigues
Hi Jun,

I was using the 0.8 branch with 2 commits behind but now I am using the
latest with the same issue. 3 topics A,B,C, created automatically with
replication factor of 2 and partitions 2. 2 brokers (0 and 1).

List of topics in zookeeper is the following:

topic: A  partition: 0leader: 1   replicas: 1,0   isr: 1
topic: A  partition: 1leader: 0   replicas: 0,1   isr: 0,1
topic: B partition: 0leader: 0   replicas: 0,1   isr: 0,1
topic: B partition: 1leader: 1   replicas: 1,0   isr: 1
topic: C  partition: 0leader: 1   replicas: 1,0   isr: 1
topic: C  partition: 1leader: 0   replicas: 0,1   isr: 0,1


*Broker 1*

This was the one I've started first. This works well and writes messages to
the disk.
In the state-change.log I have got no errors, just trace rows:

[2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info
(LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
for partition [C,1] in response to UpdateMetadata request sent by
controller 1 epoch 1 with correlation id 10 (state.change.logger)
[2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response
correlationId 10 for a request sent to broker 1 (state.change.logger)
[2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of
replica 0 for partition [C,1] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of
replica 1 for partition [C,0] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
replica 0 for partition [B,1] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
replica 0 for partition [C,0] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
replica 0 for partition [B,0] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
replica 1 for partition [B,0] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
replica 1 for partition [B,1] to OnlineReplica (state.change.logger)
[2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
replica 1 for partition [C,1] to OnlineReplica (state.change.logger)

$ du -sh /mnt/kafka-logs/*

4.0K/mnt/kafka-logs/replication-offset-checkpoint
163M/mnt/kafka-logs/A-0
4.0K/mnt/kafka-logs/A-1
4.0K/mnt/kafka-logs/B-0
90M /mnt/kafka-logs/B-1
16K /mnt/kafka-logs/C-0
4.0K/mnt/kafka-logs/C-1



*Broker 0*
*
*
Configuration is the same as Broker #1, with different broker.id. This
doesn't write to the disk. The /mnt/kafka-logs is empty without any file.

Logging a non-stopping stream of:

[2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with
correlation id 735114 from client  on partition [A,1] failed due to
Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis)
[2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
correlation id 519064 from client  on partition [B,0] failed due to
Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis)
[2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
correlation id 735118 from client  on partition [A,1] failed due to
Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis)
[2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
correlation id 519068 from client  on partition [B,0] failed due to
Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis)
...

*Server Configuration *
*
*
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=/mnt/kafka-logs
auto.create.topics.enable=true
default.replication.factor=2
num.partitions=2
log.flush.interval.messages=1
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=536870912
log.cleanup.interval.mins=1
zookeeper.connect=xxx1:2181,xxx2:2181,xxx3:2181
zookeeper.connection.timeout.ms=100
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/mnt/kafka_metrics
kafka.csv.metrics.reporter.enabled=false


I can't understand why doesn't broker0 doesn't act like a leader in their
partitions nor receive replicated data from the broker1. To eliminate the
possibility of the problem being from the producer, I will run similar
tests with the console producer.

Alex


On 13 June 2013 04:57, Jun Rao jun...@gmail.com wrote:

 Any error in state-change.log? Also, are you using the latest code in the
 0.8 branch?

 Thanks,

 Jun


 On Wed, Jun 12, 2013 at 9:27 AM, Alexandre Rodrigues 
 alexan...@blismedia.com wrote:

  Hi Jun,
 
  Thanks for your prompt answer. The producer yields those errors in the
  beginning, so I think the topic metadata refresh has nothing to do with
 

0.8 Durability Question

2013-06-13 Thread Jonathan Hodges
Looking at Jun’s ApacheCon slides (
http://www.slideshare.net/junrao/kafka-replication-apachecon2013) slide 21
titled, ‘Data Flow in Replication’ there are three possible durability
configurations which tradeoff latency for greater persistence guarantees.

The third row is the ‘no data loss’ configuration option where the producer
only receives an ack from the broker once the message(s) are committed by
the leader and peers.  Does this commit also mean the message(s) are
flushed to disk?  I know there is a separate configuration setting,
log.flush.interval.messages, but I thought in sync mode the producer
doesn’t receive an ack until message(s) are committed and flushed to disk.
 Please correct me if my understanding is incorrect.

Thanks,
Jonathan


Re: Producer will pick one of the two brokers, but never the two at same time [0.8]

2013-06-13 Thread Alexandre Rodrigues
I've tried the console producer, so I will assume that's not related with
the producer. I keep seeing the same entries in the producer from time to
time:

[2013-06-13 11:04:00,670] WARN Error while fetching metadata
[{TopicMetadata for topic C -
No partition metadata for topic C due to
kafka.common.LeaderNotAvailableException}] for topic [C]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)

Which I assume is when the consumer asks a broker who is responsible for a
partition. I might be wrong but I think one of the brokers doesn't know, so
I thought it might be related with the ZK where partition leader elections
happen (I think).

I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the
ZK nodes and started one without ensemble. Cleaned the brokers dataDir and
restarted them against that solo ZK node. The problem still the same. I
though it could be because of the ZK version, so I've decided to start a ZK
instance using the jar that ships with Kafka and the problem remains.

I am not sure if this is a real bug or just anything that might be missing
to me. I don't know if it helps, but all the trials were run without any
kind of consumer (which should be OK, no?)

Thanks,
Alex




On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com wrote:

 Hi Jun,

 I was using the 0.8 branch with 2 commits behind but now I am using the
 latest with the same issue. 3 topics A,B,C, created automatically with
 replication factor of 2 and partitions 2. 2 brokers (0 and 1).

 List of topics in zookeeper is the following:

 topic: A  partition: 0leader: 1   replicas: 1,0   isr: 1
 topic: A  partition: 1leader: 0   replicas: 0,1   isr: 0,1
 topic: B partition: 0leader: 0   replicas: 0,1   isr: 0,1
 topic: B partition: 1leader: 1   replicas: 1,0   isr: 1
 topic: C  partition: 0leader: 1   replicas: 1,0   isr: 1
 topic: C  partition: 1leader: 0   replicas: 0,1   isr: 0,1


 *Broker 1*

 This was the one I've started first. This works well and writes messages
 to the disk.
 In the state-change.log I have got no errors, just trace rows:

 [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info
 (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
 for partition [C,1] in response to UpdateMetadata request sent by
 controller 1 epoch 1 with correlation id 10 (state.change.logger)
 [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response
 correlationId 10 for a request sent to broker 1 (state.change.logger)
 [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [C,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [C,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [B,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [C,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [B,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [B,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [B,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [C,1] to OnlineReplica (state.change.logger)

 $ du -sh /mnt/kafka-logs/*

 4.0K/mnt/kafka-logs/replication-offset-checkpoint
 163M/mnt/kafka-logs/A-0
 4.0K/mnt/kafka-logs/A-1
 4.0K/mnt/kafka-logs/B-0
 90M /mnt/kafka-logs/B-1
 16K /mnt/kafka-logs/C-0
 4.0K/mnt/kafka-logs/C-1



 *Broker 0*
 *
 *
 Configuration is the same as Broker #1, with different broker.id. This
 doesn't write to the disk. The /mnt/kafka-logs is empty without any file.

 Logging a non-stopping stream of:

 [2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with
 correlation id 735114 from client  on partition [A,1] failed due to
 Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis)
 [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
 correlation id 519064 from client  on partition [B,0] failed due to
 Partition [response,0] doesn't exist on 0 (kafka.server.KafkaApis)
  [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
 correlation id 735118 from client  on partition [A,1] failed due to
 Partition [request,1] doesn't exist on 0 (kafka.server.KafkaApis)
 [2013-06-13 09:08:53,815] WARN [KafkaApi-0] Produce request with
 correlation id 519068 from client  on partition [B,0] failed due to
 Partition [response,0] doesn't exist on 0 

Re: One 0.72 ConsumerConnector, multiple threads, 1 blocks. What happens?

2013-06-13 Thread Philip O'Toole
Jun - thanks again. This is very helpful. 

Philip

On Jun 12, 2013, at 9:50 PM, Jun Rao jun...@gmail.com wrote:

 Actually, you are right. This can happen on a single topic too, if you have
 more than one consumer thread. Each consumer thread pulls data from a
 blocking queue, one or more fetchers are putting data into the queue. Say,
 you have two consumer threads and two partitions from the same broker.
 There is a single fetcher that fetches both partitions and it will put one
 partition's data into a separate queue. So, if one thread stops consuming
 data, it's queue will be full at some point. This will block the fetcher
 from putting the data into the other queue.
 
 Thanks,
 
 Jun
 
 
 On Wed, Jun 12, 2013 at 9:10 PM, Philip O'Toole phi...@loggly.com wrote:
 
 Jun -- thanks.
 
 But if the topic is the same, doesn't each thread get a partition?
 Isn't that how it works?
 
 Philip
 
 On Wed, Jun 12, 2013 at 9:08 PM, Jun Rao jun...@gmail.com wrote:
 Yes, when the consumer is consuming multiple topics, if one thread stops
 consuming topic 1, it can prevent new data getting into the consumer for
 topic 2.
 
 Thanks,
 
 Jun
 
 
 On Wed, Jun 12, 2013 at 7:43 PM, Philip O'Toole phi...@loggly.com
 wrote:
 
 Hello -- we're using 0.72. We're looking at the source, but want to be
 sure. :-)
 
 We create a single ConsumerConnector, call createMessageStreams, and
 hand the streams off to individual threads. If one of those threads
 calls next() on a stream, gets some messages, and then *blocks* in
 some subsequent operation (and blocks for minutes), can it potentially
 cause all other threads (calling next() on other streams) to block
 too? Does something inside the ConsumerConnector block all other
 stream processing? This would explain some behaviour we're seeing.
 
 Thanks,
 
 Philip
 


Re: Producer will pick one of the two brokers, but never the two at same time [0.8]

2013-06-13 Thread Alexandre Rodrigues
I think I know what's happening:

I tried to run both brokers and ZK on the same machine and it worked. I
also attempted to do the same but with a ZK node on other machine and it
also worked.

My guess is something related with ports. All the machines are on EC2 and
there might be something related with the security group. I am going to run
the first setup with all open doors and see how it goes. If the ports are
really the problem, shouldn't this kind of problem be logged somewhere?




On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com wrote:

 I've tried the console producer, so I will assume that's not related with
 the producer. I keep seeing the same entries in the producer from time to
 time:

 [2013-06-13 11:04:00,670] WARN Error while fetching metadata
 [{TopicMetadata for topic C -
 No partition metadata for topic C due to
 kafka.common.LeaderNotAvailableException}] for topic [C]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)

 Which I assume is when the consumer asks a broker who is responsible for a
 partition. I might be wrong but I think one of the brokers doesn't know, so
 I thought it might be related with the ZK where partition leader elections
 happen (I think).

 I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the
 ZK nodes and started one without ensemble. Cleaned the brokers dataDir and
 restarted them against that solo ZK node. The problem still the same. I
 though it could be because of the ZK version, so I've decided to start a ZK
 instance using the jar that ships with Kafka and the problem remains.

 I am not sure if this is a real bug or just anything that might be missing
 to me. I don't know if it helps, but all the trials were run without any
 kind of consumer (which should be OK, no?)

 Thanks,
 Alex




 On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.comwrote:

 Hi Jun,

 I was using the 0.8 branch with 2 commits behind but now I am using the
 latest with the same issue. 3 topics A,B,C, created automatically with
 replication factor of 2 and partitions 2. 2 brokers (0 and 1).

 List of topics in zookeeper is the following:

 topic: A  partition: 0leader: 1   replicas: 1,0   isr: 1
 topic: A  partition: 1leader: 0   replicas: 0,1   isr: 0,1
 topic: B partition: 0leader: 0   replicas: 0,1   isr: 0,1
 topic: B partition: 1leader: 1   replicas: 1,0   isr: 1
 topic: C  partition: 0leader: 1   replicas: 1,0   isr: 1
 topic: C  partition: 1leader: 0   replicas: 0,1   isr: 0,1


 *Broker 1*

 This was the one I've started first. This works well and writes messages
 to the disk.
 In the state-change.log I have got no errors, just trace rows:

 [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info
 (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
 for partition [C,1] in response to UpdateMetadata request sent by
 controller 1 epoch 1 with correlation id 10 (state.change.logger)
 [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response
 correlationId 10 for a request sent to broker 1 (state.change.logger)
 [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [C,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [C,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [B,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [C,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
 replica 0 for partition [B,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [B,0] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [B,1] to OnlineReplica (state.change.logger)
 [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
 replica 1 for partition [C,1] to OnlineReplica (state.change.logger)

 $ du -sh /mnt/kafka-logs/*

 4.0K/mnt/kafka-logs/replication-offset-checkpoint
 163M/mnt/kafka-logs/A-0
 4.0K/mnt/kafka-logs/A-1
 4.0K/mnt/kafka-logs/B-0
 90M /mnt/kafka-logs/B-1
 16K /mnt/kafka-logs/C-0
 4.0K/mnt/kafka-logs/C-1



 *Broker 0*
 *
 *
 Configuration is the same as Broker #1, with different broker.id. This
 doesn't write to the disk. The /mnt/kafka-logs is empty without any file.

 Logging a non-stopping stream of:

 [2013-06-13 09:08:53,814] WARN [KafkaApi-0] Produce request with
 correlation id 735114 from client  on partition [A,1] failed due to
 Partition [request,1] doesn't exist on 0 

Re: 0.8 Durability Question

2013-06-13 Thread Neha Narkhede
No. It only means that messages are written to all replicas in memory. Data
is flushed to disk asynchronously.

Thanks,
Neha
On Jun 13, 2013 3:29 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Looking at Jun’s ApacheCon slides (
 http://www.slideshare.net/junrao/kafka-replication-apachecon2013) slide 21
 titled, ‘Data Flow in Replication’ there are three possible durability
 configurations which tradeoff latency for greater persistence guarantees.

 The third row is the ‘no data loss’ configuration option where the producer
 only receives an ack from the broker once the message(s) are committed by
 the leader and peers.  Does this commit also mean the message(s) are
 flushed to disk?  I know there is a separate configuration setting,
 log.flush.interval.messages, but I thought in sync mode the producer
 doesn’t receive an ack until message(s) are committed and flushed to disk.
  Please correct me if my understanding is incorrect.

 Thanks,
 Jonathan



Re: Producer will pick one of the two brokers, but never the two at same time [0.8]

2013-06-13 Thread Jun Rao
Have you looked at #3 in http://kafka.apache.org/faq.html?

Thanks,

Jun


On Thu, Jun 13, 2013 at 6:41 AM, Alexandre Rodrigues 
alexan...@blismedia.com wrote:

 I think I know what's happening:

 I tried to run both brokers and ZK on the same machine and it worked. I
 also attempted to do the same but with a ZK node on other machine and it
 also worked.

 My guess is something related with ports. All the machines are on EC2 and
 there might be something related with the security group. I am going to run
 the first setup with all open doors and see how it goes. If the ports are
 really the problem, shouldn't this kind of problem be logged somewhere?




 On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com
 wrote:

  I've tried the console producer, so I will assume that's not related with
  the producer. I keep seeing the same entries in the producer from time to
  time:
 
  [2013-06-13 11:04:00,670] WARN Error while fetching metadata
  [{TopicMetadata for topic C -
  No partition metadata for topic C due to
  kafka.common.LeaderNotAvailableException}] for topic [C]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
 
  Which I assume is when the consumer asks a broker who is responsible for
 a
  partition. I might be wrong but I think one of the brokers doesn't know,
 so
  I thought it might be related with the ZK where partition leader
 elections
  happen (I think).
 
  I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all the
  ZK nodes and started one without ensemble. Cleaned the brokers dataDir
 and
  restarted them against that solo ZK node. The problem still the same. I
  though it could be because of the ZK version, so I've decided to start a
 ZK
  instance using the jar that ships with Kafka and the problem remains.
 
  I am not sure if this is a real bug or just anything that might be
 missing
  to me. I don't know if it helps, but all the trials were run without any
  kind of consumer (which should be OK, no?)
 
  Thanks,
  Alex
 
 
 
 
  On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com
 wrote:
 
  Hi Jun,
 
  I was using the 0.8 branch with 2 commits behind but now I am using the
  latest with the same issue. 3 topics A,B,C, created automatically with
  replication factor of 2 and partitions 2. 2 brokers (0 and 1).
 
  List of topics in zookeeper is the following:
 
  topic: A  partition: 0leader: 1   replicas: 1,0   isr: 1
  topic: A  partition: 1leader: 0   replicas: 0,1   isr: 0,1
  topic: B partition: 0leader: 0   replicas: 0,1   isr: 0,1
  topic: B partition: 1leader: 1   replicas: 1,0   isr: 1
  topic: C  partition: 0leader: 1   replicas: 1,0   isr: 1
  topic: C  partition: 1leader: 0   replicas: 0,1   isr: 0,1
 
 
  *Broker 1*
 
  This was the one I've started first. This works well and writes messages
  to the disk.
  In the state-change.log I have got no errors, just trace rows:
 
  [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info
 
 (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
  for partition [C,1] in response to UpdateMetadata request sent by
  controller 1 epoch 1 with correlation id 10 (state.change.logger)
  [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response
  correlationId 10 for a request sent to broker 1 (state.change.logger)
  [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of
  replica 0 for partition [C,1] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of
  replica 1 for partition [C,0] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
  replica 0 for partition [B,1] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
  replica 0 for partition [C,0] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
  replica 0 for partition [B,0] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
  replica 1 for partition [B,0] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
  replica 1 for partition [B,1] to OnlineReplica (state.change.logger)
  [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
  replica 1 for partition [C,1] to OnlineReplica (state.change.logger)
 
  $ du -sh /mnt/kafka-logs/*
 
  4.0K/mnt/kafka-logs/replication-offset-checkpoint
  163M/mnt/kafka-logs/A-0
  4.0K/mnt/kafka-logs/A-1
  4.0K/mnt/kafka-logs/B-0
  90M /mnt/kafka-logs/B-1
  16K /mnt/kafka-logs/C-0
  4.0K/mnt/kafka-logs/C-1
 
 
 
  *Broker 0*
  *
  *
  Configuration is the same as Broker #1, with different broker.id. This
  doesn't write to the disk. The 

Re: Arguments for Kafka over RabbitMQ ?

2013-06-13 Thread Alexis Richardson
Hi all,

First, thanks to Tim (from Rabbit) and Jonathan for moving this thread
along.  Jonathan, I hope you found my links to the data model docs,
and Tim's replies, helpful.

Has everyone got what they wanted from this thread?

alexis


On Tue, Jun 11, 2013 at 5:49 PM, Jonathan Hodges hodg...@gmail.com wrote:
 Hi Tim,

 While your comments regarding durability are accurate for 0.7 version of
 Kafka, it is a bit greyer with 0.8.  In 0.8 you have the ability to
 configure Kafka to have the durability you need.  This is what I was
 referring to with the link to Jun’s ApacheCon slides (
 http://www.slideshare.net/junrao/kafka-replication-apachecon2013).

 If you look at slide 21 titled, ‘Data Flow in Replication’ you see the
 three possible durability configurations which tradeoff latency for greater
 persistence guarantees.

 The third row is the ‘no data loss’ configuration option where the producer
 only receives an ack from the broker once the message(s) are committed by
 the leader and peers (mirrors as you call them) and flushed to disk.  This
 seems to be very similar to the scenario you describe in Rabbit, no?

 Jun or Neha can you please confirm my understanding of 0.8 durability is
 correct and there is no data loss in the scenario I describe?  I know there
 is a separate configuration setting, log.flush.interval.messages, but I
 thought in sync mode the producer doesn’t receive an ack until message(s)
 are committed and flushed to disk.  Please correct me if my understanding
 is incorrect.

 Thanks!


 On Tue, Jun 11, 2013 at 8:20 AM, Tim Watson watson.timo...@gmail.comwrote:

 Hi Jonathan,

 So, thanks for replying - that's all useful info.

 On 10 Jun 2013, at 14:19, Jonathan Hodges wrote:
  Kafka has a configurable rolling window of time it keeps the messages per
  topic.  The default is 7 days and after this time the messages are
 removed
  from disk by the broker.
  Correct, the consumers maintain their own state via what are known as
  offsets.  Also true that when producers/consumers contact the broker
 there
  is a random seek to the start of the offset, but the majority of access
  patterns are linear.
 

 So, just to be clear, the distinction that has been raised on this thread
 is only part of the story, viz the difference in rates between RabbitMQ and
 Kafka. Essentially, these two systems are performing completely different
 tasks, since in RabbitMQ, the concept of a long-term persistent topic whose
 entries are removed solely based on expiration policy is somewhat alien.
 RabbitMQ will delete messages from its message store as soon as a relevant
 consumer has seen and ACK'ed them, which *requires* tracking consumer state
 in the broker. I suspect this was your (earlier) point about Kafka /not/
 trying to be a general purpose message broker, but having an architecture
 that is highly tuned to a specific set of usage patterns.

  As you can see in the last graph of 10 million messages which is less
 than
  a GB on disk, the Rabbit throughput is capped around 10k/sec.  Beyond
  throughput, with the pending release of 0.8, Kafka will also have
  advantages around message guarantees and durability.
 
 
 [snip]
  Correct with 0.8 Kafka will have similar options like Rabbit fsync
  configuration option.

 Right, but just to be clear, unless Kafka starts to fsync for every single
 published message, you are /not/ going to offer the same guarantee. In this
 respect, rabbit is clearly putting safety above performance when that's
 what users ask it for, which is fine for some cases and not for others. By
 way of example, if you're using producer/publisher confirms with RabbitMQ,
 the broker will not ACK receipt of a message until (a) it has been fsync'ed
 to disk and (b) if the queue is mirrored, each mirror has acknowledged
 receipt of the message. Again, unless you're fsync-ing to disk on each
 publish, the guarantees will be different - and rightly so, since you can
 deal with re-publishing and de-duplication quite happily in a system that's
 dealing with a 7-day sliding window of data and thus ensuring throughput is
 more useful (in that case) than avoiding data loss on the server.

 Of course, architecturally, fsync-ing very regularly will kill the
 benefits that mmap combined with sendfile give you, since relying on the
 kernel's paging / caching capabilities is the whole point of doing that.
 That's not intended to be a criticism btw, just an observation about the
 distinction between the two system's differing approaches.

   Messages have always had ordering guarantees, but
  with 0.8 there is the notion of topic replicas similar to replication
  factor in Hadoop or Cassandra.
 
  http://www.slideshare.net/junrao/kafka-replication-apachecon2013
 
  With configuration you can tradeoff latency for durability with 3
 options.
   - Producer receives no acks (no network delay)
   - Producer waits for ack from broker leader (1 network roundtrip)
   - Producer waits for quorum ack (2 network 

Re: Producer will pick one of the two brokers, but never the two at same time [0.8]

2013-06-13 Thread Alexandre Rodrigues
I have but this is a different thing. It's related with ports and security
groups and not with the bind addresses. It's solved now.

Thanks


On 13 June 2013 15:42, Jun Rao jun...@gmail.com wrote:

 Have you looked at #3 in http://kafka.apache.org/faq.html?

 Thanks,

 Jun


 On Thu, Jun 13, 2013 at 6:41 AM, Alexandre Rodrigues 
 alexan...@blismedia.com wrote:

  I think I know what's happening:
 
  I tried to run both brokers and ZK on the same machine and it worked. I
  also attempted to do the same but with a ZK node on other machine and it
  also worked.
 
  My guess is something related with ports. All the machines are on EC2 and
  there might be something related with the security group. I am going to
 run
  the first setup with all open doors and see how it goes. If the ports are
  really the problem, shouldn't this kind of problem be logged somewhere?
 
 
 
 
  On 13 June 2013 12:13, Alexandre Rodrigues alexan...@blismedia.com
  wrote:
 
   I've tried the console producer, so I will assume that's not related
 with
   the producer. I keep seeing the same entries in the producer from time
 to
   time:
  
   [2013-06-13 11:04:00,670] WARN Error while fetching metadata
   [{TopicMetadata for topic C -
   No partition metadata for topic C due to
   kafka.common.LeaderNotAvailableException}] for topic [C]: class
   kafka.common.LeaderNotAvailableException
(kafka.producer.BrokerPartitionInfo)
  
   Which I assume is when the consumer asks a broker who is responsible
 for
  a
   partition. I might be wrong but I think one of the brokers doesn't
 know,
  so
   I thought it might be related with the ZK where partition leader
  elections
   happen (I think).
  
   I was using a 3 node ZK 3.3.5. First I've deleted the snapshot of all
 the
   ZK nodes and started one without ensemble. Cleaned the brokers dataDir
  and
   restarted them against that solo ZK node. The problem still the same. I
   though it could be because of the ZK version, so I've decided to start
 a
  ZK
   instance using the jar that ships with Kafka and the problem remains.
  
   I am not sure if this is a real bug or just anything that might be
  missing
   to me. I don't know if it helps, but all the trials were run without
 any
   kind of consumer (which should be OK, no?)
  
   Thanks,
   Alex
  
  
  
  
   On 13 June 2013 10:15, Alexandre Rodrigues alexan...@blismedia.com
  wrote:
  
   Hi Jun,
  
   I was using the 0.8 branch with 2 commits behind but now I am using
 the
   latest with the same issue. 3 topics A,B,C, created automatically with
   replication factor of 2 and partitions 2. 2 brokers (0 and 1).
  
   List of topics in zookeeper is the following:
  
   topic: A  partition: 0leader: 1   replicas: 1,0   isr: 1
   topic: A  partition: 1leader: 0   replicas: 0,1   isr: 0,1
   topic: B partition: 0leader: 0   replicas: 0,1   isr: 0,1
   topic: B partition: 1leader: 1   replicas: 1,0   isr: 1
   topic: C  partition: 0leader: 1   replicas: 1,0   isr: 1
   topic: C  partition: 1leader: 0   replicas: 0,1   isr: 0,1
  
  
   *Broker 1*
  
   This was the one I've started first. This works well and writes
 messages
   to the disk.
   In the state-change.log I have got no errors, just trace rows:
  
   [2013-06-13 08:51:33,505] TRACE Broker 1 cached leader info
  
 
 (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:2),AllReplicas:0,1)
   for partition [C,1] in response to UpdateMetadata request sent by
   controller 1 epoch 1 with correlation id 10 (state.change.logger)
   [2013-06-13 08:51:33,506] TRACE Controller 1 epoch 1 received response
   correlationId 10 for a request sent to broker 1 (state.change.logger)
   [2013-06-13 08:51:33,509] TRACE Controller 1 epoch 1 changed state of
   replica 0 for partition [C,1] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,510] TRACE Controller 1 epoch 1 changed state of
   replica 1 for partition [C,0] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
   replica 0 for partition [B,1] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,511] TRACE Controller 1 epoch 1 changed state of
   replica 0 for partition [C,0] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
   replica 0 for partition [B,0] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,512] TRACE Controller 1 epoch 1 changed state of
   replica 1 for partition [B,0] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
   replica 1 for partition [B,1] to OnlineReplica (state.change.logger)
   [2013-06-13 08:51:33,513] TRACE Controller 1 epoch 1 changed state of
   replica 1 for partition [C,1] to OnlineReplica (state.change.logger)
  
   $ du -sh /mnt/kafka-logs/*
  
   4.0K

Using Kafka for data messages

2013-06-13 Thread Josh Foure
 
Hi all, my team is proposing a novel
way of using Kafka and I am hoping someone can help do a sanity check on this:
 
1.  When a user logs
into our website, we will create a “logged in” event message in Kafka
containing the user id.  
2.  30+ systems
(consumers each in their own consumer groups) will consume this event and
lookup data about this user id.  They
will then publish all of this data back out into Kafka as a series of data 
messages.  One message may include the user’s name,
another the user’s address, another the user’s last 10 searches, another their
last 10 orders, etc.  The plan is that a
single “logged in” event may trigger hundreds if not thousands of additional 
data
messages.
3.  Another system,
the “Product Recommendation” system, will have consumed the original “logged in”
message and will also consume a subset of the data messages (realistically I
think it would need to consume all of the data messages but would discard the
ones it doesn’t need).  As the Product
Recommendation consumes the data messages, it will process recommended products
and publish out recommendation messages (that get more and more specific as it
has consumed more and more data messages).
4.  The original
website will consume the recommendation messages and show the recommendations to
the user as it gets them.
 
You don’t see many systems implemented this way but since
Kafka has such a higher throughput than your typical MOM, this approach seems
innovative.  
 
The benefits are:
 
1.  If we start
collecting more information about the users, we can simply start publishing
that in new data messages and consumers can start processing those messages
whenever they want.  If we were doing
this in a more traditional SOA approach the schemas would need to change every 
time
we added a field but with this approach we can just create new messages without
touching existing ones.
2.  We are looking to
make our systems smaller so if we end up with more, smaller systems that each
publish a small number of events, it becomes easier to make changes and test
the changes.  If we were doing this in a
more traditional SOA approach we would need to retest each consumer every time
we changed our bigger SOA services.
 
The downside appears to be:
 
1.  We may be
publishing a large amount of data that never gets used but that everyone needs
to consume to see if they need it before discarding it.
2.  The Product Recommendation
system may need to wait until it consumes a number of messages and keep track
of all the data internally before it can start processing.
3.  While we may be
able to keep the messages somewhat small, the fact that they contain data will
mean they will be bigger than your tradition EDA messages.
4.  It seems like we
can do a lot of this using SOA (we already have an ESB than can do
transformations to address consumers expecting an older version of the data).
 
Any insight is appreciated.
Thanks,
Josh

Re: Arguments for Kafka over RabbitMQ ?

2013-06-13 Thread Jonathan Hodges
Hi Alexis,

This was very helpful and I also appreciate both yours and Tim's input
here.  It clears up the cases for when to use Rabbit or Kafka.  What is
great is they are both open source with vibrant communities behind them.

-Jonathan

Go
On Jun 13, 2013 8:45 AM, Alexis Richardson ale...@rabbitmq.com wrote:

 Hi all,

 First, thanks to Tim (from Rabbit) and Jonathan for moving this thread
 along.  Jonathan, I hope you found my links to the data model docs,
 and Tim's replies, helpful.

 Has everyone got what they wanted from this thread?

 alexis


 On Tue, Jun 11, 2013 at 5:49 PM, Jonathan Hodges hodg...@gmail.com
 wrote:
  Hi Tim,
 
  While your comments regarding durability are accurate for 0.7 version of
  Kafka, it is a bit greyer with 0.8.  In 0.8 you have the ability to
  configure Kafka to have the durability you need.  This is what I was
  referring to with the link to Jun’s ApacheCon slides (
  http://www.slideshare.net/junrao/kafka-replication-apachecon2013).
 
  If you look at slide 21 titled, ‘Data Flow in Replication’ you see the
  three possible durability configurations which tradeoff latency for
 greater
  persistence guarantees.
 
  The third row is the ‘no data loss’ configuration option where the
 producer
  only receives an ack from the broker once the message(s) are committed by
  the leader and peers (mirrors as you call them) and flushed to disk.
  This
  seems to be very similar to the scenario you describe in Rabbit, no?
 
  Jun or Neha can you please confirm my understanding of 0.8 durability is
  correct and there is no data loss in the scenario I describe?  I know
 there
  is a separate configuration setting, log.flush.interval.messages, but I
  thought in sync mode the producer doesn’t receive an ack until message(s)
  are committed and flushed to disk.  Please correct me if my understanding
  is incorrect.
 
  Thanks!
 
 
  On Tue, Jun 11, 2013 at 8:20 AM, Tim Watson watson.timo...@gmail.com
 wrote:
 
  Hi Jonathan,
 
  So, thanks for replying - that's all useful info.
 
  On 10 Jun 2013, at 14:19, Jonathan Hodges wrote:
   Kafka has a configurable rolling window of time it keeps the messages
 per
   topic.  The default is 7 days and after this time the messages are
  removed
   from disk by the broker.
   Correct, the consumers maintain their own state via what are known as
   offsets.  Also true that when producers/consumers contact the broker
  there
   is a random seek to the start of the offset, but the majority of
 access
   patterns are linear.
  
 
  So, just to be clear, the distinction that has been raised on this
 thread
  is only part of the story, viz the difference in rates between RabbitMQ
 and
  Kafka. Essentially, these two systems are performing completely
 different
  tasks, since in RabbitMQ, the concept of a long-term persistent topic
 whose
  entries are removed solely based on expiration policy is somewhat alien.
  RabbitMQ will delete messages from its message store as soon as a
 relevant
  consumer has seen and ACK'ed them, which *requires* tracking consumer
 state
  in the broker. I suspect this was your (earlier) point about Kafka /not/
  trying to be a general purpose message broker, but having an
 architecture
  that is highly tuned to a specific set of usage patterns.
 
   As you can see in the last graph of 10 million messages which is less
  than
   a GB on disk, the Rabbit throughput is capped around 10k/sec.  Beyond
   throughput, with the pending release of 0.8, Kafka will also have
   advantages around message guarantees and durability.
  
  
  [snip]
   Correct with 0.8 Kafka will have similar options like Rabbit fsync
   configuration option.
 
  Right, but just to be clear, unless Kafka starts to fsync for every
 single
  published message, you are /not/ going to offer the same guarantee. In
 this
  respect, rabbit is clearly putting safety above performance when that's
  what users ask it for, which is fine for some cases and not for others.
 By
  way of example, if you're using producer/publisher confirms with
 RabbitMQ,
  the broker will not ACK receipt of a message until (a) it has been
 fsync'ed
  to disk and (b) if the queue is mirrored, each mirror has acknowledged
  receipt of the message. Again, unless you're fsync-ing to disk on each
  publish, the guarantees will be different - and rightly so, since you
 can
  deal with re-publishing and de-duplication quite happily in a system
 that's
  dealing with a 7-day sliding window of data and thus ensuring
 throughput is
  more useful (in that case) than avoiding data loss on the server.
 
  Of course, architecturally, fsync-ing very regularly will kill the
  benefits that mmap combined with sendfile give you, since relying on the
  kernel's paging / caching capabilities is the whole point of doing that.
  That's not intended to be a criticism btw, just an observation about the
  distinction between the two system's differing approaches.
 
Messages have always had ordering 

RE: shipping logs to s3 or other servers for backups

2013-06-13 Thread S Ahmed
Hi,

In my application, I am storing user events, and I want to partition the
storage by day.

So at the end of a day, I want to take that file and ship it to s3 or
another server as a backup.  This way I can replay the events for a
specific day if needed.

These events also have to be in order.

How should I structure things in kafka to ensure that these user events all
belong to the same file (or set of files if the file gets large I believe
kafka splits it into multiple files).


Re: Using Kafka for data messages

2013-06-13 Thread Mahendra M
Hi Josh,

The idea looks very interesting. I just had one doubt.

1. A user logs in. His login id is sent on a topic
2. Other systems (consumers on this topic) consumer this message and
publish their results to another topic

This will be happening without any particular order for hundreds of users.

Now the site being displayed to the user.. How will you fetch only messages
for that user from the queue?

Regards,
Mahendra



On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote:


 Hi all, my team is proposing a novel
 way of using Kafka and I am hoping someone can help do a sanity check on
 this:

 1.  When a user logs
 into our website, we will create a “logged in” event message in Kafka
 containing the user id.
 2.  30+ systems
 (consumers each in their own consumer groups) will consume this event and
 lookup data about this user id.  They
 will then publish all of this data back out into Kafka as a series of data
 messages.  One message may include the user’s name,
 another the user’s address, another the user’s last 10 searches, another
 their
 last 10 orders, etc.  The plan is that a
 single “logged in” event may trigger hundreds if not thousands of
 additional data
 messages.
 3.  Another system,
 the “Product Recommendation” system, will have consumed the original
 “logged in”
 message and will also consume a subset of the data messages (realistically
 I
 think it would need to consume all of the data messages but would discard
 the
 ones it doesn’t need).  As the Product
 Recommendation consumes the data messages, it will process recommended
 products
 and publish out recommendation messages (that get more and more specific
 as it
 has consumed more and more data messages).
 4.  The original
 website will consume the recommendation messages and show the
 recommendations to
 the user as it gets them.

 You don’t see many systems implemented this way but since
 Kafka has such a higher throughput than your typical MOM, this approach
 seems
 innovative.

 The benefits are:

 1.  If we start
 collecting more information about the users, we can simply start publishing
 that in new data messages and consumers can start processing those messages
 whenever they want.  If we were doing
 this in a more traditional SOA approach the schemas would need to change
 every time
 we added a field but with this approach we can just create new messages
 without
 touching existing ones.
 2.  We are looking to
 make our systems smaller so if we end up with more, smaller systems that
 each
 publish a small number of events, it becomes easier to make changes and
 test
 the changes.  If we were doing this in a
 more traditional SOA approach we would need to retest each consumer every
 time
 we changed our bigger SOA services.

 The downside appears to be:

 1.  We may be
 publishing a large amount of data that never gets used but that everyone
 needs
 to consume to see if they need it before discarding it.
 2.  The Product Recommendation
 system may need to wait until it consumes a number of messages and keep
 track
 of all the data internally before it can start processing.
 3.  While we may be
 able to keep the messages somewhat small, the fact that they contain data
 will
 mean they will be bigger than your tradition EDA messages.
 4.  It seems like we
 can do a lot of this using SOA (we already have an ESB than can do
 transformations to address consumers expecting an older version of the
 data).

 Any insight is appreciated.
 Thanks,
 Josh




-- 
Mahendra

http://twitter.com/mahendra


Re: Producer only finding partition on 1 of 2 Brokers, even though ZK shows 1 partition exists on both Brokers?

2013-06-13 Thread Brett Hoerner
As an update, this continues to affect us.

First I'd like to note ways in which my issues seems different than
KAFKA-278,

* I did not add a new broker or a new topic, this topic has been in use on
two existing brokers for months
* The topic definitely exists on both brokers. The topic/data directory
exists on both, and as noted above both brokers even show it in ZK

That said, I went ahead and did the work around from the ticket, which in
my case basically means restart the brokers (because the topic/data
directory already exists).

After bouncing both brokers I *did* get data to both brokers for a while.
I'm not yet sure if this only lasts until I have to restart my *producers*
(as I've had to update them a bit lately), but that is my current guess.

When I start a producer now (both brokers up, data looks exactly like ZK in
original post), I get output like this:

2013-06-13_15:46:11.64496 Broker Topic Path = /brokers/topics
2013-06-13_15:46:11.0 15:46:11.999 [run-main] INFO
kafka.producer.ProducerPool - Creating async producer for broker id = 1 at
10.10.150.16:9092
2013-06-13_15:46:12.00109 15:46:12.001 [run-main] INFO
kafka.producer.ProducerPool - Creating async producer for broker id = 0 at
10.10.71.113:9092
2013-06-13_15:46:16.77956 15:46:16.779 [ProducerSendThread-1375847990]
INFO  kafka.producer.SyncProducer - Connected to 10.10.150.16:9092 for
producing

The last line repeats as the SyncProducer does its periodic reconnect
thing, but note that it is *always* broker 1 at 10.10.150.16:9092, even
though it seems like broker 0 is seen.

Thanks for your help!



On Tue, Jun 4, 2013 at 6:24 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 You are probably hitting https://issues.apache.org/jira/browse/KAFKA-278.
 Can you please try the workaround mentioned in the JIRA description?

 Thanks,
 Neha


 On Tue, Jun 4, 2013 at 4:21 PM, Brett Hoerner br...@bretthoerner.com
 wrote:

  (version 0.7.2)
 
  For some reason, my producers are only picking up the partition on 1 of
 my
  2 brokers. I've been digging through the code, and I don't see any issues
  given the state of my ZK nodes. The producer never seems to locate a
  partition on Broker0, even though ZK clearly states that it has 1 (just
  like Broker1 does).
 
  The result of this is that the producer works, but only sends data to one
  Broker.
 
  Has anyone seen something like this before? I'm stumped. Thanks.
 
  # zk information follows:
  [zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
  [1, 0]
 
  [zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/ids/0
  10.10.71.113-1365733477001:10.10.71.113:9092
 
  [zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/ids/1
  10.10.150.16-1369236663861:10.10.150.16:9092
 
  [zk: localhost:2181(CONNECTED) 10] ls /kafka/brokers/topics/test
  [1, 0]
 
  # this is the most confusing one
  [zk: localhost:2181(CONNECTED) 11] get /kafka/brokers/topics/test/0
  1
 
  [zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/test/1
  1
 
  # kafka producer ZK information DEBUG log, as you can see it finds 0
  partitions on Broker0:
  Broker Topic Path = /brokers/topics
  DEBUG [2013-06-04 23:14:30,689] kafka.producer.ZKBrokerPartitionInfo:
  Broker ids and # of partitions on each for topic: test =
 ArrayBuffer((0,0),
  (1,1))
  DEBUG [2013-06-04 23:14:30,690] kafka.producer.ZKBrokerPartitionInfo:
  Sorted list of broker ids and partition ids on each for topic: test =
  TreeSet(1-0)
  DEBUG [2013-06-04 23:14:30,819]
  kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener:
  [BrokerTopicsListener] Creating broker topics listener to watch the
  following paths -
  /broker/topics, /broker/topics/topic, /broker/ids
  DEBUG [2013-06-04 23:14:30,823]
  kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener:
  [BrokerTopicsListener] Initialized this broker topics listener with
 initial
  mapping of broker id to partition id per topic with Map(test -
  TreeSet(1-0))
  DEBUG [2013-06-04 23:14:30,904] kafka.producer.ZKBrokerPartitionInfo:
  Registering listener on path: /brokers/topics/test
 



Re: Using Kafka for data messages

2013-06-13 Thread Josh Foure
Hi Mahendra, I think that is where it gets a little tricky.  I think it would 
work something like this:

1.  Web sends login event for user user123 to topic GUEST_EVENT.
2.  All of the systems consume those messages and publish the data messages to 
topic GUEST_DATA.user123.
3.  The Recommendation system gets all of the data from GUEST_DATA.user123, 
processes and then publishes back to the same topic GUEST_DATA.user123.
4.  The Web consumes the messages from the same topic (there is a different 
topic for every user that logged in) GUEST_DATA.user123 and when it finds the 
recommendation messages it pushes that to the browser (note it will need to 
read all the other data messages and discard those when looking for the 
recommendation messages).  I have a concern that the Web will be flooded with a 
ton of messages that it will promptly drop but I don't want to create a new 
response or recommendation topic because then I feel like I am tightly 
coupling the message to the functionality and in the future different systems 
may want to consume those messages as well.

Does that make sense?
Josh







 From: Mahendra M mahendr...@gmail.com
To: users@kafka.apache.org; Josh Foure user...@yahoo.com 
Sent: Thursday, June 13, 2013 12:56 PM
Subject: Re: Using Kafka for data messages
 

Hi Josh,

The idea looks very interesting. I just had one doubt.

1. A user logs in. His login id is sent on a topic
2. Other systems (consumers on this topic) consumer this message and
publish their results to another topic

This will be happening without any particular order for hundreds of users.

Now the site being displayed to the user.. How will you fetch only messages
for that user from the queue?

Regards,
Mahendra



On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote:


 Hi all, my team is proposing a novel
 way of using Kafka and I am hoping someone can help do a sanity check on
 this:

 1.  When a user logs
 into our website, we will create a “logged in” event message in Kafka
 containing the user id.
 2.  30+ systems
 (consumers each in their own consumer groups) will consume this event and
 lookup data about this user id.  They
 will then publish all of this data back out into Kafka as a series of data
 messages.  One message may include the user’s name,
 another the user’s address, another the user’s last 10 searches, another
 their
 last 10 orders, etc.  The plan is that a
 single “logged in” event may trigger hundreds if not thousands of
 additional data
 messages.
 3.  Another system,
 the “Product Recommendation” system, will have consumed the original
 “logged in”
 message and will also consume a subset of the data messages (realistically
 I
 think it would need to consume all of the data messages but would discard
 the
 ones it doesn’t need).  As the Product
 Recommendation consumes the data messages, it will process recommended
 products
 and publish out recommendation messages (that get more and more specific
 as it
 has consumed more and more data messages).
 4.  The original
 website will consume the recommendation messages and show the
 recommendations to
 the user as it gets them.

 You don’t see many systems implemented this way but since
 Kafka has such a higher throughput than your typical MOM, this approach
 seems
 innovative.

 The benefits are:

 1.  If we start
 collecting more information about the users, we can simply start publishing
 that in new data messages and consumers can start processing those messages
 whenever they want.  If we were doing
 this in a more traditional SOA approach the schemas would need to change
 every time
 we added a field but with this approach we can just create new messages
 without
 touching existing ones.
 2.  We are looking to
 make our systems smaller so if we end up with more, smaller systems that
 each
 publish a small number of events, it becomes easier to make changes and
 test
 the changes.  If we were doing this in a
 more traditional SOA approach we would need to retest each consumer every
 time
 we changed our bigger SOA services.

 The downside appears to be:

 1.  We may be
 publishing a large amount of data that never gets used but that everyone
 needs
 to consume to see if they need it before discarding it.
 2.  The Product Recommendation
 system may need to wait until it consumes a number of messages and keep
 track
 of all the data internally before it can start processing.
 3.  While we may be
 able to keep the messages somewhat small, the fact that they contain data
 will
 mean they will be bigger than your tradition EDA messages.
 4.  It seems like we
 can do a lot of this using SOA (we already have an ESB than can do
 transformations to address consumers expecting an older version of the
 data).

 Any insight is appreciated.
 Thanks,
 Josh




-- 
Mahendra

http://twitter.com/mahendra

Re: Using Kafka for data messages

2013-06-13 Thread Timothy Chen
Also since you're going to be creating a topic per user, the number of
concurrent users will also be a concern to Kafka as it doesn't like massive
amounts of topics.

Tim


On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote:

 Hi Mahendra, I think that is where it gets a little tricky.  I think it
 would work something like this:

 1.  Web sends login event for user user123 to topic GUEST_EVENT.
 2.  All of the systems consume those messages and publish the data
 messages to topic GUEST_DATA.user123.
 3.  The Recommendation system gets all of the data from
 GUEST_DATA.user123, processes and then publishes back to the same topic
 GUEST_DATA.user123.
 4.  The Web consumes the messages from the same topic (there is a
 different topic for every user that logged in) GUEST_DATA.user123 and
 when it finds the recommendation messages it pushes that to the browser
 (note it will need to read all the other data messages and discard those
 when looking for the recommendation messages).  I have a concern that the
 Web will be flooded with a ton of messages that it will promptly drop but I
 don't want to create a new response or recommendation topic because
 then I feel like I am tightly coupling the message to the functionality and
 in the future different systems may want to consume those messages as well.

 Does that make sense?
 Josh






 
  From: Mahendra M mahendr...@gmail.com
 To: users@kafka.apache.org; Josh Foure user...@yahoo.com
 Sent: Thursday, June 13, 2013 12:56 PM
 Subject: Re: Using Kafka for data messages


 Hi Josh,

 The idea looks very interesting. I just had one doubt.

 1. A user logs in. His login id is sent on a topic
 2. Other systems (consumers on this topic) consumer this message and
 publish their results to another topic

 This will be happening without any particular order for hundreds of users.

 Now the site being displayed to the user.. How will you fetch only messages
 for that user from the queue?

 Regards,
 Mahendra



 On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote:

 
  Hi all, my team is proposing a novel
  way of using Kafka and I am hoping someone can help do a sanity check on
  this:
 
  1.  When a user logs
  into our website, we will create a “logged in” event message in Kafka
  containing the user id.
  2.  30+ systems
  (consumers each in their own consumer groups) will consume this event and
  lookup data about this user id.  They
  will then publish all of this data back out into Kafka as a series of
 data
  messages.  One message may include the user’s name,
  another the user’s address, another the user’s last 10 searches, another
  their
  last 10 orders, etc.  The plan is that a
  single “logged in” event may trigger hundreds if not thousands of
  additional data
  messages.
  3.  Another system,
  the “Product Recommendation” system, will have consumed the original
  “logged in”
  message and will also consume a subset of the data messages
 (realistically
  I
  think it would need to consume all of the data messages but would discard
  the
  ones it doesn’t need).  As the Product
  Recommendation consumes the data messages, it will process recommended
  products
  and publish out recommendation messages (that get more and more specific
  as it
  has consumed more and more data messages).
  4.  The original
  website will consume the recommendation messages and show the
  recommendations to
  the user as it gets them.
 
  You don’t see many systems implemented this way but since
  Kafka has such a higher throughput than your typical MOM, this approach
  seems
  innovative.
 
  The benefits are:
 
  1.  If we start
  collecting more information about the users, we can simply start
 publishing
  that in new data messages and consumers can start processing those
 messages
  whenever they want.  If we were doing
  this in a more traditional SOA approach the schemas would need to change
  every time
  we added a field but with this approach we can just create new messages
  without
  touching existing ones.
  2.  We are looking to
  make our systems smaller so if we end up with more, smaller systems that
  each
  publish a small number of events, it becomes easier to make changes and
  test
  the changes.  If we were doing this in a
  more traditional SOA approach we would need to retest each consumer every
  time
  we changed our bigger SOA services.
 
  The downside appears to be:
 
  1.  We may be
  publishing a large amount of data that never gets used but that everyone
  needs
  to consume to see if they need it before discarding it.
  2.  The Product Recommendation
  system may need to wait until it consumes a number of messages and keep
  track
  of all the data internally before it can start processing.
  3.  While we may be
  able to keep the messages somewhat small, the fact that they contain data
  will
  mean they will be bigger than your tradition EDA messages.
  4.  It seems like we
  

Re: Using Kafka for data messages

2013-06-13 Thread Josh Foure
Ah yes, I had read that Kafka likes under 1,000 topics but I wasn't sure if 
that was really a limitation.  In principle I wouldn't mind having all guest 
events placed on the GUEST_DATA queue but I thought that by having more 
topics I could minimize having consumers read messages only to discard them.  
My thought had been that if I have 20 Web JVM and at any given time I have 
1,000 people logged in per JVM, each JVM would only need to consume the 
messages from 1,000 topics.  If instead there is a single topic, each JVM will 
be consuming from the same topic (and be in different consumer groups) but 19 
out of 20 messages will be for guests that are not even logged into that JVM.  
Since Kafka doesn't have message selectors or anything like that I was hoping 
to use topics to help segregate the traffic.  I don't want to use 1 topic per 
Web JVM because in the future other consumers may be interested in that same 
data and the services that put the data in
 Kafka shouldn't have to lookup what JVM that user is logged into (or get that 
from another message and keep track of it).  Any thoughts on how to work around 
this?  I know there are topic partitions but that seems more like a way to 
distribute the workload in terms of storing the messages and not for the 
message selection scenario I am describing if I understood correctly.





 From: Timothy Chen tnac...@gmail.com
To: users@kafka.apache.org; Josh Foure user...@yahoo.com 
Sent: Thursday, June 13, 2013 2:13 PM
Subject: Re: Using Kafka for data messages
 

Also since you're going to be creating a topic per user, the number of
concurrent users will also be a concern to Kafka as it doesn't like massive
amounts of topics.

Tim


On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote:

 Hi Mahendra, I think that is where it gets a little tricky.  I think it
 would work something like this:

 1.  Web sends login event for user user123 to topic GUEST_EVENT.
 2.  All of the systems consume those messages and publish the data
 messages to topic GUEST_DATA.user123.
 3.  The Recommendation system gets all of the data from
 GUEST_DATA.user123, processes and then publishes back to the same topic
 GUEST_DATA.user123.
 4.  The Web consumes the messages from the same topic (there is a
 different topic for every user that logged in) GUEST_DATA.user123 and
 when it finds the recommendation messages it pushes that to the browser
 (note it will need to read all the other data messages and discard those
 when looking for the recommendation messages).  I have a concern that the
 Web will be flooded with a ton of messages that it will promptly drop but I
 don't want to create a new response or recommendation topic because
 then I feel like I am tightly coupling the message to the functionality and
 in the future different systems may want to consume those messages as well.

 Does that make sense?
 Josh






 
  From: Mahendra M mahendr...@gmail.com
 To: users@kafka.apache.org; Josh Foure user...@yahoo.com
 Sent: Thursday, June 13, 2013 12:56 PM
 Subject: Re: Using Kafka for data messages


 Hi Josh,

 The idea looks very interesting. I just had one doubt.

 1. A user logs in. His login id is sent on a topic
 2. Other systems (consumers on this topic) consumer this message and
 publish their results to another topic

 This will be happening without any particular order for hundreds of users.

 Now the site being displayed to the user.. How will you fetch only messages
 for that user from the queue?

 Regards,
 Mahendra



 On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote:

 
  Hi all, my team is proposing a novel
  way of using Kafka and I am hoping someone can help do a sanity check on
  this:
 
  1.  When a user logs
  into our website, we will create a “logged in” event message in Kafka
  containing the user id.
  2.  30+ systems
  (consumers each in their own consumer groups) will consume this event and
  lookup data about this user id.  They
  will then publish all of this data back out into Kafka as a series of
 data
  messages.  One message may include the user’s name,
  another the user’s address, another the user’s last 10 searches, another
  their
  last 10 orders, etc.  The plan is that a
  single “logged in” event may trigger hundreds if not thousands of
  additional data
  messages.
  3.  Another system,
  the “Product Recommendation” system, will have consumed the original
  “logged in”
  message and will also consume a subset of the data messages
 (realistically
  I
  think it would need to consume all of the data messages but would discard
  the
  ones it doesn’t need).  As the Product
  Recommendation consumes the data messages, it will process recommended
  products
  and publish out recommendation messages (that get more and more specific
  as it
  has consumed more and more data messages).
  4.  The original
  website will consume the recommendation 

Re: Producer only finding partition on 1 of 2 Brokers, even though ZK shows 1 partition exists on both Brokers?

2013-06-13 Thread Brett Hoerner
You know what, it's likely this is all because I'm running a bad fork of
Kafka 0.7.2 for Scala 2.10 (on the producers/consumers) since that's the
version we've standardized on.

Behavior in 2.9.2 with the official Kafka 0.7.2 release seems much more
normal -- I'm working on downgrading all our clients and I'll report back.

Sorry for the spam.


On Thu, Jun 13, 2013 at 12:31 PM, Brett Hoerner br...@bretthoerner.comwrote:

 As an update, this continues to affect us.

 First I'd like to note ways in which my issues seems different than
 KAFKA-278,

 * I did not add a new broker or a new topic, this topic has been in use on
 two existing brokers for months
 * The topic definitely exists on both brokers. The topic/data directory
 exists on both, and as noted above both brokers even show it in ZK

 That said, I went ahead and did the work around from the ticket, which
 in my case basically means restart the brokers (because the topic/data
 directory already exists).

 After bouncing both brokers I *did* get data to both brokers for a while.
 I'm not yet sure if this only lasts until I have to restart my *producers*
 (as I've had to update them a bit lately), but that is my current guess.

 When I start a producer now (both brokers up, data looks exactly like ZK
 in original post), I get output like this:

 2013-06-13_15:46:11.64496 Broker Topic Path = /brokers/topics
 2013-06-13_15:46:11.0 15:46:11.999 [run-main] INFO
 kafka.producer.ProducerPool - Creating async producer for broker id = 1 at
 10.10.150.16:9092
 2013-06-13_15:46:12.00109 15:46:12.001 [run-main] INFO
 kafka.producer.ProducerPool - Creating async producer for broker id = 0 at
 10.10.71.113:9092
 2013-06-13_15:46:16.77956 15:46:16.779 [ProducerSendThread-1375847990]
 INFO  kafka.producer.SyncProducer - Connected to 10.10.150.16:9092 for
 producing

 The last line repeats as the SyncProducer does its periodic reconnect
 thing, but note that it is *always* broker 1 at 10.10.150.16:9092, even
 though it seems like broker 0 is seen.

 Thanks for your help!



 On Tue, Jun 4, 2013 at 6:24 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 You are probably hitting https://issues.apache.org/jira/browse/KAFKA-278.
 Can you please try the workaround mentioned in the JIRA description?

 Thanks,
 Neha


 On Tue, Jun 4, 2013 at 4:21 PM, Brett Hoerner br...@bretthoerner.com
 wrote:

  (version 0.7.2)
 
  For some reason, my producers are only picking up the partition on 1 of
 my
  2 brokers. I've been digging through the code, and I don't see any
 issues
  given the state of my ZK nodes. The producer never seems to locate a
  partition on Broker0, even though ZK clearly states that it has 1 (just
  like Broker1 does).
 
  The result of this is that the producer works, but only sends data to
 one
  Broker.
 
  Has anyone seen something like this before? I'm stumped. Thanks.
 
  # zk information follows:
  [zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
  [1, 0]
 
  [zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/ids/0
  10.10.71.113-1365733477001:10.10.71.113:9092
 
  [zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/ids/1
  10.10.150.16-1369236663861:10.10.150.16:9092
 
  [zk: localhost:2181(CONNECTED) 10] ls /kafka/brokers/topics/test
  [1, 0]
 
  # this is the most confusing one
  [zk: localhost:2181(CONNECTED) 11] get /kafka/brokers/topics/test/0
  1
 
  [zk: localhost:2181(CONNECTED) 12] get /kafka/brokers/topics/test/1
  1
 
  # kafka producer ZK information DEBUG log, as you can see it finds 0
  partitions on Broker0:
  Broker Topic Path = /brokers/topics
  DEBUG [2013-06-04 23:14:30,689] kafka.producer.ZKBrokerPartitionInfo:
  Broker ids and # of partitions on each for topic: test =
 ArrayBuffer((0,0),
  (1,1))
  DEBUG [2013-06-04 23:14:30,690] kafka.producer.ZKBrokerPartitionInfo:
  Sorted list of broker ids and partition ids on each for topic: test =
  TreeSet(1-0)
  DEBUG [2013-06-04 23:14:30,819]
  kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener:
  [BrokerTopicsListener] Creating broker topics listener to watch the
  following paths -
  /broker/topics, /broker/topics/topic, /broker/ids
  DEBUG [2013-06-04 23:14:30,823]
  kafka.producer.ZKBrokerPartitionInfo$BrokerTopicsListener:
  [BrokerTopicsListener] Initialized this broker topics listener with
 initial
  mapping of broker id to partition id per topic with Map(test -
  TreeSet(1-0))
  DEBUG [2013-06-04 23:14:30,904] kafka.producer.ZKBrokerPartitionInfo:
  Registering listener on path: /brokers/topics/test
 





Re: Using Kafka for data messages

2013-06-13 Thread Taylor Gautier
Spot on. This one was of the areas that we had to workaround.   Remember
that there is a 1:1 relationship of topics to directories and most file
systems don't like 10s of thousands  of directories.  We found on practice
that 60k per machine was a practical limit using I believe EXT3FS


On Thursday, June 13, 2013, Timothy Chen wrote:

 Also since you're going to be creating a topic per user, the number of
 concurrent users will also be a concern to Kafka as it doesn't like massive
 amounts of topics.

 Tim


 On Thu, Jun 13, 2013 at 10:47 AM, Josh Foure user...@yahoo.com wrote:

  Hi Mahendra, I think that is where it gets a little tricky.  I think it
  would work something like this:
 
  1.  Web sends login event for user user123 to topic GUEST_EVENT.
  2.  All of the systems consume those messages and publish the data
  messages to topic GUEST_DATA.user123.
  3.  The Recommendation system gets all of the data from
  GUEST_DATA.user123, processes and then publishes back to the same topic
  GUEST_DATA.user123.
  4.  The Web consumes the messages from the same topic (there is a
  different topic for every user that logged in) GUEST_DATA.user123 and
  when it finds the recommendation messages it pushes that to the browser
  (note it will need to read all the other data messages and discard those
  when looking for the recommendation messages).  I have a concern that the
  Web will be flooded with a ton of messages that it will promptly drop
 but I
  don't want to create a new response or recommendation topic because
  then I feel like I am tightly coupling the message to the functionality
 and
  in the future different systems may want to consume those messages as
 well.
 
  Does that make sense?
  Josh
 
 
 
 
 
 
  
   From: Mahendra M mahendr...@gmail.com
  To: users@kafka.apache.org; Josh Foure user...@yahoo.com
  Sent: Thursday, June 13, 2013 12:56 PM
  Subject: Re: Using Kafka for data messages
 
 
  Hi Josh,
 
  The idea looks very interesting. I just had one doubt.
 
  1. A user logs in. His login id is sent on a topic
  2. Other systems (consumers on this topic) consumer this message and
  publish their results to another topic
 
  This will be happening without any particular order for hundreds of
 users.
 
  Now the site being displayed to the user.. How will you fetch only
 messages
  for that user from the queue?
 
  Regards,
  Mahendra
 
 
 
  On Thu, Jun 13, 2013 at 8:51 PM, Josh Foure user...@yahoo.com wrote:
 
  
   Hi all, my team is proposing a novel
   way of using Kafka and I am hoping someone can help do a sanity check
 on
   this:
  
   1.  When a user logs
   into our website, we will create a “logged in” event message in Kafka
   containing the user id.
   2.  30+ systems
   (consumers each in their own consumer groups) will consume this event
 and
   lookup data about this user id.  They
   will then publish all of this data back out into Kafka as a series of
  data
   messages.  One message may include the user’s name,
   another the user’s address, another the user’s last 10 searches,
 another
   their
   last 10 orders, etc.  The plan is that a
   single “logged in” event may trigger hundreds if not thousands of
   additional data
   messages.
   3.  Another system,
   the “Product Recommendation” system, will have consumed the original
   “logged in”
   message and will also consume a subset of the data messages
  (realistically
   I
   think it would need to consume all of the data messages but would
 discard
   the
   ones it doesn’t need).  As the Product
   Recommendation consumes the data messages, it will process recommended
   products
   and publish out recommendation messages (that get more and more
 specific
   as it
   has consumed more and more data messages).
   4.  The original
   website will consume the recommendation messages and show the
   recommendations to
   the user as it gets them.
  
   You don’t see many systems implemented this way but since
  


Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?

2013-06-13 Thread Philip O'Toole
Hello -- is it possible for our code to stall a ConsumerConnector from
doing any consuming for, say, 30 seconds, until we can be sure that
all other ConsumeConnectors are rebalanced?

It seems that the first ConsumerConnector to come up is prefetching
some data, and we end up with duplicate messages. We looked at the
code for the high-level consumer
(core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala)
and it looks like it empties some queues after a rebalance, but we
still see duplicate messages.

I'm sure this question has been asked before :-) but this is our first
time really working with the high-level consumer, and this caught us
by surprise. When there is *no* data in Kafka, wait until everything
balances and then send data in everything works fine, but if there is
some data sitting in the brokers, we seems to get dupes, even when
each thread sleeps for many seconds after creating the
ConsumerConnector.

Are we missing something?

Thanks,

Philip


Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?

2013-06-13 Thread Philip O'Toole
Just to be clear, I'm not asking that we solve duplicate messages on
crash before commit to Zookeeper, just an apparent problem where if
Kafka has some data, and we start on ConsumerConnectors, we get dupe
data since some Consumers come up before others.

Any help?

Philip

On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote:
 Hello -- is it possible for our code to stall a ConsumerConnector from
 doing any consuming for, say, 30 seconds, until we can be sure that
 all other ConsumeConnectors are rebalanced?

 It seems that the first ConsumerConnector to come up is prefetching
 some data, and we end up with duplicate messages. We looked at the
 code for the high-level consumer
 (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala)
 and it looks like it empties some queues after a rebalance, but we
 still see duplicate messages.

 I'm sure this question has been asked before :-) but this is our first
 time really working with the high-level consumer, and this caught us
 by surprise. When there is *no* data in Kafka, wait until everything
 balances and then send data in everything works fine, but if there is
 some data sitting in the brokers, we seems to get dupes, even when
 each thread sleeps for many seconds after creating the
 ConsumerConnector.

 Are we missing something?

 Thanks,

 Philip


Re: Kafka 0.8 Maven and IntelliJ

2013-06-13 Thread Jun Rao
Thanks. Which version of Intellij are you using?

Jun


On Thu, Jun 13, 2013 at 10:20 AM, Dragos Manolescu 
dragos.manole...@servicenow.com wrote:

 Hmm, I've just pulled 0.8.0-beta1-candidate1, removed .idea* from my
 top-level directory, executed gen-idea, and then opened and built the
 project in IntelliJ w/o problems.

 I noticed that the build uses an old version of the sbt-idea plugin:

 addSbtPlugin(com.github.mpeltonen % sbt-idea % 1.2.0)

 The latest release is 1.4.0, perhaps upgrading would help?


 -Dragos



 On 6/12/13 9:03 PM, Jun Rao jun...@gmail.com wrote:

 Dragos,
 
 After the sbt upgrade 3-4 months ago, some of us are struggling to get the
 Kafka code cleanly loaded to Intellij after doing ./sbt gen-idea. Were
 you able to do that successfully?
 
 Thanks,
 
 Jun
 
 
 On Wed, Jun 12, 2013 at 10:45 AM, Dragos Manolescu 
 dragos.manole...@servicenow.com wrote:
 
  For IntelliJ I've always used the gen-idea sbt plugin:
  https://github.com/mpeltonen/sbt-idea
 
  -Dragos
 
 
  On 6/11/13 10:41 PM, Jason Rosenberg j...@squareup.com wrote:
 
  Try the one under core/targets?
  
  
  On Tue, Jun 11, 2013 at 3:34 PM, Florin Trofin ftro...@adobe.com
 wrote:
  
   I downloaded the latest 0.8 snapshot and I want to build using Maven:
  
   ./sbt make-pom
  
   Generates a bunch of pom.xml files but when I try to open one of
 them in
   IntelliJ they are not recognized. Do I need to do any other step?
 Which
   pom do I need to open?
  
   Thanks!
  
   Florin
  
  
 
 




Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?

2013-06-13 Thread Jun Rao
Are you messages compressed in batches? If so, some dups are expected
during rebalance. In 0.8, such dups are eliminated. Other than that,
rebalance shouldn't cause dups since we commit consumed offsets to ZK
before doing a rebalance.

Thanks,

Jun


On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote:

 Hello -- is it possible for our code to stall a ConsumerConnector from
 doing any consuming for, say, 30 seconds, until we can be sure that
 all other ConsumeConnectors are rebalanced?

 It seems that the first ConsumerConnector to come up is prefetching
 some data, and we end up with duplicate messages. We looked at the
 code for the high-level consumer
 (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala)
 and it looks like it empties some queues after a rebalance, but we
 still see duplicate messages.

 I'm sure this question has been asked before :-) but this is our first
 time really working with the high-level consumer, and this caught us
 by surprise. When there is *no* data in Kafka, wait until everything
 balances and then send data in everything works fine, but if there is
 some data sitting in the brokers, we seems to get dupes, even when
 each thread sleeps for many seconds after creating the
 ConsumerConnector.

 Are we missing something?

 Thanks,

 Philip



Re: Stall high-level 0.72 ConsumerConnector until all balanced? Avoid message dupes?

2013-06-13 Thread Philip O'Toole
Jun -- thanks. We're using 0.72.

No, the messages are not compressed, and since we do appear to be
seeing dupes in our tests, it indicates our own code is buggy.

Thanks,

Philip

On Thu, Jun 13, 2013 at 9:15 PM, Jun Rao jun...@gmail.com wrote:
 Are you messages compressed in batches? If so, some dups are expected
 during rebalance. In 0.8, such dups are eliminated. Other than that,
 rebalance shouldn't cause dups since we commit consumed offsets to ZK
 before doing a rebalance.

 Thanks,

 Jun


 On Thu, Jun 13, 2013 at 7:34 PM, Philip O'Toole phi...@loggly.com wrote:

 Hello -- is it possible for our code to stall a ConsumerConnector from
 doing any consuming for, say, 30 seconds, until we can be sure that
 all other ConsumeConnectors are rebalanced?

 It seems that the first ConsumerConnector to come up is prefetching
 some data, and we end up with duplicate messages. We looked at the
 code for the high-level consumer
 (core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala)
 and it looks like it empties some queues after a rebalance, but we
 still see duplicate messages.

 I'm sure this question has been asked before :-) but this is our first
 time really working with the high-level consumer, and this caught us
 by surprise. When there is *no* data in Kafka, wait until everything
 balances and then send data in everything works fine, but if there is
 some data sitting in the brokers, we seems to get dupes, even when
 each thread sleeps for many seconds after creating the
 ConsumerConnector.

 Are we missing something?

 Thanks,

 Philip