Re: custom kafka consumer - strangeness

2014-01-09 Thread Chris Curtin
If you look at the example simple consumer:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

You'll see:

  if (currentOffset  readOffset) {
System.out.println(Found an old offset:  + currentOffset + 
Expecting:  + readOffset);
continue;
}

and a comment in the 'Reading the Data' part:

Also note that we are explicitly checking that the offset being read is not
less than the offset that we requested. This is needed since if Kafka is
compressing the messages, the fetch request will return an entire
compressed block even if the requested offset isn't the beginning of the
compressed block. Thus a message we saw previously may be returned again.

This is probably what is happening to you

Chris


On Thu, Jan 9, 2014 at 4:00 PM, Gerrit Jansen van Vuuren 
gerrit...@gmail.com wrote:

 Hi,

 I'm writing a custom consumer for kafka 0.8.
 Everything works except for the following:

 a. connect, send fetch, read all results
 b. send fetch
 c. send fetch
 d. send fetch
 e. via the console publisher, publish 2 messages
 f. send fetch :corr-id 1
 g. read 2 messages published :offsets [10 11] :corr-id 1
 h. send fetch :corr-id 2
 i. read 2 messages published :offsets [10 11] :corr-id 2
 j.  send fetch ...

 The problem is I get the messages sent twice as a response to two separate
 fetch requests. The correlation id is distinct so it cannot be that I read
 the response twice. The offsets of the 2 messages are are the same so they
 are duplicates, and its not the producer sending the messages twice.

 Note: the same connection is kept open the whole time, and I send
 block,receive then send again, after the first 2 messages are read, the
 offsets are incremented and the next fetch will ask kafka to give it
 messages from the new offsets.

 any ideas of why kafka would be sending the messages again on the second
 fetch request?

 Regards,
  Gerrit



Re: Special Bay Area HUG: Tajo and Samza

2013-10-18 Thread Chris Curtin
Hi Jay,

Do they record these meetups?

Thanks,

Chris


On Thu, Oct 17, 2013 at 5:03 PM, Jay Kreps jay.kr...@gmail.com wrote:

 FYI.

 -- Forwarded message --
 From: Jakob Homan jgho...@gmail.com
 Date: Thu, Oct 17, 2013 at 11:08 AM
 Subject: Special Bay Area HUG: Tajo and Samza
 To: d...@samza.incubator.apache.org


 Hey everybody-
Join us at LinkedIn Nov. 5 for a special HUG dedicated to two new
 awesome Incubator projects, Tajo, a low-latency SQL query engine atop YARN
 and Samza.

 http://www.meetup.com/hadoop/events/146077932/

 -Jakob



Re: High Level Consumer error handling and clean exit

2013-07-10 Thread Chris Curtin
Thanks Ian.

Is your consumer multi-threaded? If so can you share how you coordinated
each of the threads so you knew it was 'okay' to commit across all the
threads? I'm stuck on how to do this without really complicating the
consumer.

Thanks,

Chris


On Tue, Jul 9, 2013 at 5:51 PM, Ian Friedman i...@flurry.com wrote:

 Hey Chris,

 The way I handled this in my application using the High Level Consumer was
 to turn off auto-commit and commit manually after finishing a batch of
 messages (obviously you could do it after every message, but for my
 purposes it was better to have batches)

 --
 Ian Friedman





High Level Consumer error handling and clean exit

2013-07-09 Thread Chris Curtin
Hi,

I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is
handled.

Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'

Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
provided
- startup again and look to see what offset was first emitted for a
partition

Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?

Here is the example code in the 'business logic':

public void run() {
ConsumerIteratorbyte[], byte[] it = m_stream.iterator();
int counter = 0;
while (it.hasNext())   {
MessageAndMetadatabyte[], byte[] msg = it.next();
if (counter == 10) {
System.out.println(Stopping Thread  + m_threadNumber + :
Partition:  + msg.partition() +
: Offset:  + msg.offset() +  : + new
String(msg.message()));
break;
}
System.out.println(Thread  + m_threadNumber + : Partition: 
+ msg.partition() +
: Offset:  + msg.offset() +  : + new
String(msg.message()));
counter++;
}

System.out.println(Shutting down Thread:  + m_threadNumber);
}

I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.

This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.


Thanks,

Chris


Re: High Level Consumer error handling and clean exit

2013-07-09 Thread Chris Curtin
Hi Philip,

Correct, I don't want to explicitly control the offset committing. The
ConsumerConnector handles that well enough except for when I want to
shutdown and NOT have Kafka think I consumed that last message for a
stream. This isn't the crash case, it is a case where the logic consuming
the message detects and error and wants to cleanly exit until that issue
can be resolved, but not lose the message it was trying to process when the
problem is resolved.

My understanding is that the commitOffsets() call is across all threads,
not just for the stream my thread is reading from. So knowing it is okay to
call this requires coordination across all my threads, which makes a High
Level Consumer a lot harder to write correctly.

Thinking about what I'd like to happen is: my code hands the message back
to the KafkaStream (or whatever level knows about the consumed offsets) and
says
- set the next start offset for this topic/partition to this message in
ZooKeeper
- cleanly shutdown the stream from the broker(s)
- don't force a rebalance on the consumer since something is wrong with
processing of the data in the message, not the message.
- If I try to use the stream again I should get an exception
- I don't think I would want this to cause a complete shutdown of the
ConsumerConnector, in case other threads are still processing. If all
threads have the same issue they will all fail soon enough and do the same
logic. But if only one thread fails, our Operations teams will need to
resolve the issue then do a clean restart to recover.

I think this logic would only happen when the down stream system was having
issues since the iterator would be drained correctly when the 'shutdown'
call to ConsumerConnector is made.

Thanks,

Chris



On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com wrote:

 It seems like you're not explicitly controlling the offsets. Is that
 correct?

 If so, the moment you pull a message from the stream, the client framework
 considers it processed. So if your app subsequently crashes before the
 message is fully processed, and auto-commit updates the offsets in
 Zookeeper, you will drop that message.

 The solution to this to call commitOffsets() explicitly.

 Philip

 On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi,
 
  I'm working through a production-level High Level Consumer app and have a
  couple of error/shutdown questions to understand how the offset storage
 is
  handled.
 
  Test case - simulate an error writing to destination application, for
  example a database, offset is 'lost'
 
  Scenario
  - write 500 messages for each topic/partition
  - use the example High Level Consumer code I wrote for the Wiki
  - Change the code so that every 10th read from the 'hasNext()'
  ConsumerIterator breaks out of the loop and returns from the thread,
  simulating a hard error. I write the offset to System.out to see what was
  provided
  - startup again and look to see what offset was first emitted for a
  partition
 
  Issue: Kafka treats the offset for the message read that caused me to
 break
  out of the loop as processed (as expected), but I really failed. How do I
  tell Kafka that I didn't really consume that offset?
 
  Here is the example code in the 'business logic':
 
  public void run() {
  ConsumerIteratorbyte[], byte[] it = m_stream.iterator();
  int counter = 0;
  while (it.hasNext())   {
  MessageAndMetadatabyte[], byte[] msg = it.next();
  if (counter == 10) {
  System.out.println(Stopping Thread  + m_threadNumber +
 :
  Partition:  + msg.partition() +
  : Offset:  + msg.offset() +  : + new
  String(msg.message()));
  break;
  }
  System.out.println(Thread  + m_threadNumber + :
 Partition: 
  + msg.partition() +
  : Offset:  + msg.offset() +  : + new
  String(msg.message()));
  counter++;
  }
 
  System.out.println(Shutting down Thread:  + m_threadNumber);
  }
 
  I understand that handling 'hard' errors like JVM crashes, kill -9 etc.
 may
  leave the offsets in ZooKeeper incorrect, but I'm trying to understand
 what
  happens in a clean shutdown where Kafka and the Consumer are behaving
  correctly but I can't process what I read.
 
  This also feels like I'm blurring SimpleConsumer theory into this, but
  except for the exception/shutdown case High Level Consumer does
 everything
  I want.
 
 
  Thanks,
 
  Chris
 



Re: High Level Consumer error handling and clean exit

2013-07-09 Thread Chris Curtin
Thanks. I know I can write a SimpleConsumer to do this, but it feels like
the High Level consumer is _so_ close to being robust enough tohandle
what I'd think people want to do in most applications. I'm going to submit
an enhancement request.

I'm trying to understand the level of data loss in this situation, so I
looked deeper into the KafkaStream logic: it looks like a KafkaStream
includes a BlockingQueue for transferring the messages to my code from
Kafka. If I call shutdown() when I detect the problem, are the messages
already in the BlockingQueue considered 'read' by Kafka, or does the
shutdown peek into the Queue to see what is still there before updating
ZooKeeper?

My concern is if that queue is not empty I'll be losing more than the one
message that led to the failure.

I'm also curious how others are handling this situation. Do you assume the
message that is causing problems is lost or somehow know to go get it
later? I'd think others would have this problem too.

Thanks,

Chris



On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole phi...@loggly.com wrote:

 OK.

 It sounds like you're requesting functionality that the high-level consumer
 simply doesn't have. As I am sure you know, there is no API call that
 supports handing back a message.

 I might be missing something, but if you need this kind of control, I think
 you need to code your application differently. You could try creating a
 ConsumerConnection per partition (your clients will then need to know the
 number of partitions out there). That way commitOffsets() will actually
 only apply to that partition. Auto-commit the same way. It might give you
 the level of control you need.

 Philip

 On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi Philip,
 
  Correct, I don't want to explicitly control the offset committing. The
  ConsumerConnector handles that well enough except for when I want to
  shutdown and NOT have Kafka think I consumed that last message for a
  stream. This isn't the crash case, it is a case where the logic consuming
  the message detects and error and wants to cleanly exit until that issue
  can be resolved, but not lose the message it was trying to process when
 the
  problem is resolved.
 
  My understanding is that the commitOffsets() call is across all threads,
  not just for the stream my thread is reading from. So knowing it is okay
 to
  call this requires coordination across all my threads, which makes a High
  Level Consumer a lot harder to write correctly.
 
  Thinking about what I'd like to happen is: my code hands the message back
  to the KafkaStream (or whatever level knows about the consumed offsets)
 and
  says
  - set the next start offset for this topic/partition to this message in
  ZooKeeper
  - cleanly shutdown the stream from the broker(s)
  - don't force a rebalance on the consumer since something is wrong with
  processing of the data in the message, not the message.
  - If I try to use the stream again I should get an exception
  - I don't think I would want this to cause a complete shutdown of the
  ConsumerConnector, in case other threads are still processing. If all
  threads have the same issue they will all fail soon enough and do the
 same
  logic. But if only one thread fails, our Operations teams will need to
  resolve the issue then do a clean restart to recover.
 
  I think this logic would only happen when the down stream system was
 having
  issues since the iterator would be drained correctly when the 'shutdown'
  call to ConsumerConnector is made.
 
  Thanks,
 
  Chris
 
 
 
  On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com
 wrote:
 
   It seems like you're not explicitly controlling the offsets. Is that
   correct?
  
   If so, the moment you pull a message from the stream, the client
  framework
   considers it processed. So if your app subsequently crashes before the
   message is fully processed, and auto-commit updates the offsets in
   Zookeeper, you will drop that message.
  
   The solution to this to call commitOffsets() explicitly.
  
   Philip
  
   On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Hi,
   
I'm working through a production-level High Level Consumer app and
  have a
couple of error/shutdown questions to understand how the offset
 storage
   is
handled.
   
Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'
   
Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what
  was
provided
- startup again and look to see what offset was first emitted for a
partition
   
Issue: Kafka treats the offset

Re: High Level Consumer error handling and clean exit

2013-07-09 Thread Chris Curtin
Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966



On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin curtin.ch...@gmail.com wrote:

 Thanks. I know I can write a SimpleConsumer to do this, but it feels like
 the High Level consumer is _so_ close to being robust enough tohandle
 what I'd think people want to do in most applications. I'm going to submit
 an enhancement request.

 I'm trying to understand the level of data loss in this situation, so I
 looked deeper into the KafkaStream logic: it looks like a KafkaStream
 includes a BlockingQueue for transferring the messages to my code from
 Kafka. If I call shutdown() when I detect the problem, are the messages
 already in the BlockingQueue considered 'read' by Kafka, or does the
 shutdown peek into the Queue to see what is still there before updating
 ZooKeeper?

 My concern is if that queue is not empty I'll be losing more than the one
 message that led to the failure.

 I'm also curious how others are handling this situation. Do you assume the
 message that is causing problems is lost or somehow know to go get it
 later? I'd think others would have this problem too.

 Thanks,

 Chris



 On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole phi...@loggly.com wrote:

 OK.

 It sounds like you're requesting functionality that the high-level
 consumer
 simply doesn't have. As I am sure you know, there is no API call that
 supports handing back a message.

 I might be missing something, but if you need this kind of control, I
 think
 you need to code your application differently. You could try creating a
 ConsumerConnection per partition (your clients will then need to know the
 number of partitions out there). That way commitOffsets() will actually
 only apply to that partition. Auto-commit the same way. It might give you
 the level of control you need.

 Philip

 On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi Philip,
 
  Correct, I don't want to explicitly control the offset committing. The
  ConsumerConnector handles that well enough except for when I want to
  shutdown and NOT have Kafka think I consumed that last message for a
  stream. This isn't the crash case, it is a case where the logic
 consuming
  the message detects and error and wants to cleanly exit until that issue
  can be resolved, but not lose the message it was trying to process when
 the
  problem is resolved.
 
  My understanding is that the commitOffsets() call is across all threads,
  not just for the stream my thread is reading from. So knowing it is
 okay to
  call this requires coordination across all my threads, which makes a
 High
  Level Consumer a lot harder to write correctly.
 
  Thinking about what I'd like to happen is: my code hands the message
 back
  to the KafkaStream (or whatever level knows about the consumed offsets)
 and
  says
  - set the next start offset for this topic/partition to this message in
  ZooKeeper
  - cleanly shutdown the stream from the broker(s)
  - don't force a rebalance on the consumer since something is wrong with
  processing of the data in the message, not the message.
  - If I try to use the stream again I should get an exception
  - I don't think I would want this to cause a complete shutdown of the
  ConsumerConnector, in case other threads are still processing. If all
  threads have the same issue they will all fail soon enough and do the
 same
  logic. But if only one thread fails, our Operations teams will need to
  resolve the issue then do a clean restart to recover.
 
  I think this logic would only happen when the down stream system was
 having
  issues since the iterator would be drained correctly when the 'shutdown'
  call to ConsumerConnector is made.
 
  Thanks,
 
  Chris
 
 
 
  On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com
 wrote:
 
   It seems like you're not explicitly controlling the offsets. Is that
   correct?
  
   If so, the moment you pull a message from the stream, the client
  framework
   considers it processed. So if your app subsequently crashes before the
   message is fully processed, and auto-commit updates the offsets in
   Zookeeper, you will drop that message.
  
   The solution to this to call commitOffsets() explicitly.
  
   Philip
  
   On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Hi,
   
I'm working through a production-level High Level Consumer app and
  have a
couple of error/shutdown questions to understand how the offset
 storage
   is
handled.
   
Test case - simulate an error writing to destination application,
 for
example a database, offset is 'lost'
   
Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset

Re: one consumerConnector or many?

2013-05-29 Thread Chris Curtin
I'd look at a variation of #2. Can your messages by grouped into a 'class
(for lack of a better term)' that are consumed together? For example a
'class' of 'auditing events' or 'sensor events'. The idea would to then
have a topic for 'class'.

A couple of benefits to this:
- you can define your consumption of a 'class's resources by value. So the
'audit' topic may only get a 2 threaded consumer while the 'sensor' class
gets a 10 threaded consumer.
- you can stop processing a 'class' of messages if you need to without
taking all the consumers off line (Assuming you have different processors
or a way while running to alter your number of threads per topic.)

Since it sounds like you may be frequently adding new message types this
approach also allows you to decide if you want to shutdown only a part of
your processing to add the new code to handle the message.

Finally, why the concern about socket use? A well configured Windows or
Linux machine can have thousands of open sockets without problems. Since
0.8.0 only connects to the Broker with the topic/partition you end up with
1 socket per topic/partition and consumer.

Hope this helps,

Chris


On Wed, May 29, 2013 at 9:13 AM, Rob Withers reefed...@gmail.com wrote:

 In thinking about the design of consumption, we have in mind a generic
 consumer server which would consume from more than one message type.  The
 handling of each type of message would be different.  I suppose we could
 have upwards of say 50 different message types, eventually, maybe 100+
 different types.  Which of the following designs would be best and why
 would
 the other options be bad?



 1)  Have all message types go through one topic and use a dispatcher
 pattern to select the correct handler.  Use one consumerConnector.

 2)  Use a different topic for each message type, but still use one
 consumerConnector and a dispatcher pattern.

 3)  Use a different topic for each message type and have a separate
 consumerConnector for each topic.



 I am struggling with whether my assumptions are correct.  It seems that a
 single connector for a topic would establish one socket to each broker, as
 rebalancing assigns various partitions to that thread.  Option 2 would pull
 messages from more than one topic through a single socket to a particular
 broker, is it so?  Would option 3 be reasonable, establishing upwards of
 100
 sockets per broker?



 I am guestimating that option 2 is the right way forward, to bound socket
 use, and we'll need to figure out a way to parameterize stream consumption
 with the right handlers for a particular msg type.  If we add a topic, do
 you think we should create a new connector or restart the original
 connector
 with the new topic in the map?



 Thanks,

 rob




Re: one consumerConnector or many?

2013-05-29 Thread Chris Curtin
That's a good question about # of sockets when a single consumer is
connecting. I'll let someone from LinkedIn comment if each consumer has a
socket per topic/partition or if it is per Broker, since I'm not familiar
with that part of the code.

On Wed, May 29, 2013 at 9:53 AM, Withers, Robert robert.with...@dish.comwrote:

 Thanks for the info.  Are you saying that even with a single connector,
 with say 3 topics and 3 threads per topic and 3 brokers with 3 partitions
 for all 3 topics on all 3 brokers, that a consumer box would have 9 sockets
 open?  What if there are 6 partitions per topic, would that be 18 open
 sockets?

 I have read somewhere that a high partition number, per topic, is
 desirable, to scale out the consumers and to be prepared to dynamically
 scale out consumption during a traffic spike.  Is it so?  100 topics, with
 16 brokers and 200 partitions per topic with 1 consumer connector (just
 hypothetically so) would be 1600 sockets or 2 sockets?

 For sure these boxes have plenty of ports.  I am just thinking through
 possible failures and what flexibility we have in configuration of
 producers/consumers to topics.  Really the question is best practices in
 this area.  A producer server handling 100+ msg types could also connect
 quite a bit.  So, perhaps it is best to restrict producer and consumer
 servers to process a restricted class of types.  Certainly if the
 producer is also hosting a web server, but perhaps not as dire on the
 consumer side.

 thanks,
 rob
 
 From: Chris Curtin [curtin.ch...@gmail.com]
 Sent: Wednesday, May 29, 2013 7:36 AM
 To: users
 Subject: Re: one consumerConnector or many?

 I'd look at a variation of #2. Can your messages by grouped into a 'class
 (for lack of a better term)' that are consumed together? For example a
 'class' of 'auditing events' or 'sensor events'. The idea would to then
 have a topic for 'class'.

 A couple of benefits to this:
 - you can define your consumption of a 'class's resources by value. So the
 'audit' topic may only get a 2 threaded consumer while the 'sensor' class
 gets a 10 threaded consumer.
 - you can stop processing a 'class' of messages if you need to without
 taking all the consumers off line (Assuming you have different processors
 or a way while running to alter your number of threads per topic.)

 Since it sounds like you may be frequently adding new message types this
 approach also allows you to decide if you want to shutdown only a part of
 your processing to add the new code to handle the message.

 Finally, why the concern about socket use? A well configured Windows or
 Linux machine can have thousands of open sockets without problems. Since
 0.8.0 only connects to the Broker with the topic/partition you end up with
 1 socket per topic/partition and consumer.

 Hope this helps,

 Chris


 On Wed, May 29, 2013 at 9:13 AM, Rob Withers reefed...@gmail.com wrote:

  In thinking about the design of consumption, we have in mind a generic
  consumer server which would consume from more than one message type.  The
  handling of each type of message would be different.  I suppose we could
  have upwards of say 50 different message types, eventually, maybe 100+
  different types.  Which of the following designs would be best and why
  would
  the other options be bad?
 
 
 
  1)  Have all message types go through one topic and use a dispatcher
  pattern to select the correct handler.  Use one consumerConnector.
 
  2)  Use a different topic for each message type, but still use one
  consumerConnector and a dispatcher pattern.
 
  3)  Use a different topic for each message type and have a separate
  consumerConnector for each topic.
 
 
 
  I am struggling with whether my assumptions are correct.  It seems that a
  single connector for a topic would establish one socket to each broker,
 as
  rebalancing assigns various partitions to that thread.  Option 2 would
 pull
  messages from more than one topic through a single socket to a particular
  broker, is it so?  Would option 3 be reasonable, establishing upwards of
  100
  sockets per broker?
 
 
 
  I am guestimating that option 2 is the right way forward, to bound socket
  use, and we'll need to figure out a way to parameterize stream
 consumption
  with the right handlers for a particular msg type.  If we add a topic, do
  you think we should create a new connector or restart the original
  connector
  with the new topic in the map?
 
 
 
  Thanks,
 
  rob
 
 



Re: Partitioning and scale

2013-05-22 Thread Chris Curtin
Hi Tim,


On Wed, May 22, 2013 at 3:25 PM, Timothy Chen tnac...@gmail.com wrote:

 Hi,

 I'm currently trying to understand how Kafka (0.8) can scale with our usage
 pattern and how to setup the partitioning.

 We want to route the same messages belonging to the same id to the same
 queue, so its consumer will able to consume all the messages of that id.

 My questions:

  - From my understanding, in Kafka we would need to have a custom
 partitioner that routes the same messages to the same partition right?  I'm
 trying to find examples of writing this partitioner logic, but I can't find
 any. Can someone point me to an example?

 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

The partitioner here does a simple mod on the IP address and the # of
partitions. You'd need to define your own logic, but this is a start.


 - I see that Kafka server.properties allows one to specify the number of
 partitions it supports. However, when we want to scale I wonder if we add #
 of partitions or # of brokers, will the same partitioner start distributing
 the messages to different partitions?
  And if it does, how can that same consumer continue to read off the
 messages of those ids if it was interrupted in the middle?


I'll let someone else answer this.



 - I'd like to create a consumer per partition, and for each one to
 subscribe to the changes of that one. How can this be done in kafka?


Two ways: Simple Consumer or Consumer Groups:

Depends on the level of control you want on code processing a specific
partition vs. getting one assigned to it (and level of control over offset
management).

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Examplehttps://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example



 Thanks,

 Tim



Re: Is there a limitation on the number of simultaneous consumer connections to the same topic

2013-05-13 Thread Chris Curtin
Yes. However be aware that starting and stopping processes will cause a
rebalance of the consumers, so your code may find itself receiving events
from a different partition suddenly (so don't assume the partition you are
reading isn't going to change!)  Also as things are starting up you may
find a process receives many partitions at first, but as the other
processes are started the partitions get reassigned.

Finally, running more processes than partitions will mean those other
processes are idle.




On Mon, May 13, 2013 at 8:49 AM, Ming Li lm0...@gmail.com wrote:

 Hi Andrea,

 Thanks for your reply~~ you mean, it is no difference between
 having N threads share the same ConsumerConnector created by
 Consumer.createJavaConsumerConnector,
 and
 having N consumer process which has its own ConsumerConnector in every one
 of them?

 Best Regards,
 Li Ming



 On Mon, May 13, 2013 at 6:43 PM, Andrea Gazzarini 
 andrea.gazzar...@gmail.com wrote:

  It shouldn't.
  Creating several listener / consumer processes belonging to the same
 group
  means you are working with a point-to-point message channel so incoming
  messages will be delivered only to one consumer.
 
  Maybe I'm wrong but I believe in that scenario there's no difference
 (from
  broker perspective) between threads and processes.
 
  Regards,
  Andrea
 
 
  On 05/13/2013 12:15 PM, Ming Li wrote:
 
  Hi,
 
  Does Kafka have a limitation on the simultaneous connections (created
 with
  Consumer.**createJavaConsumerConnector) for the same topic within the
  same
  group?
 
  My scenario is I need to consume a topic from different process (not
  thread), so I need to create lots of high level consumers.
 
 
  Best Regards,
  Li Ming
 
 
 



Re: a few questions from high level consumer documentation.

2013-05-08 Thread Chris Curtin
I'll try to answer some, the Kafka team will need to answer the others:


On Wed, May 8, 2013 at 12:17 PM, Yu, Libo libo...@citi.com wrote:

 Hi,

 I read this link
 https://cwiki.apache.org/KAFKA/consumer-group-example.html
 and have a few questions (if not too many).

 1 When you say the iterator may block, do you mean hasNext() may block?


Yes.



 2 Remember, you can only use a single process per Consumer Group.
 Do you mean we can only use a single process on one node of the
 cluster for a consumer group?
 Or there can be only one process on the whole cluster for a consumer
 group? Please clarify on this.

 Bug. I'll change it. When I wrote this I mis-understood the re-balancing
step. I missed this reference but fixed the others. Sorry



 3 Why save offset to zookeeper? Is it easier to save it to a local file?

 4 When client exits/crashes or leader for a partition is changed,
 duplicate messages may be replayed. To help avoid this (replayed duplicate
 messages), make sure you provide a clean way for your client to exit
 instead of assuming it can be 'kill -9'd.

 a.   For client exit, if the client is receiving data at the time, how
 to do a clean exit? How can client tell consumer to write offset to
 zookeepr before exiting?


If you call the shutdown() method on the Consumer it will cleanly stop,
releasing any blocked iterators. In the example it goes to sleep for a few
seconds then cleanly shuts down.




 b.  For client crash, what can client do to avoid duplicate messages
 when restarted? What I can think of is to read last message from log file
 and ignore the first few received duplicate messages until receiving the
 last read message. But is it possible for client to read log file directly?


If you can't tolerate the possibility of duplicates you need to look at the
Simple Consumer example, There you control the offset storage.




 c.   For the change of the partition leader, is there anything that
 clients can do to avoid duplicates?

 Thanks.



 Libo




Re: Kafka wiki Documentation conventions - looking for feedback

2013-05-01 Thread Chris Curtin
I've tested my examples with the new (4/30) release and they work, so I've
updated the documentation.

Thanks,

Chris


On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote:

 Thanks. I also updated your producer example to reflect a recent config
 change (broker.list = metadata.broker.list).

 Jun


 On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Thanks, I missed that the addition of consumers can cause a re-balance.
  Thought it was only on Leader changes.
 
  I've updated the wording in the example.
 
  I'll pull down the beta and test my application then change the names on
  the properties.
 
  Thanks,
 
  Chris
 
 
  On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote:
 
   Basically, every time a consumer joins a group, every consumer in the
   groups gets a ZK notification and each of them tries to own a subset of
  the
   total number of partitions. A given partition is only assigned to one
 of
   the consumers in the same group. Once the ownership is determined, each
   consumer consumes messages coming from its partitions and manages the
   offset of those partitions. Since at any given point of time, a
 partition
   is only owned by one consumer, there won't be conflicts on updating the
   offsets. More details are described in the consumer rebalancing
  algorithm
   section of http://kafka.apache.org/07/design.html
  
   Thanks,
  
   Jun
  
  
   On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Jun, can you explain this a little better? I thought when using
  Consumer
Groups that on startup Kafka connects to ZooKeeper and finds the last
   read
offset for every partition in the topic being requested for the
 group.
   That
is then the starting point for the consumer threads.
   
If a second process starts while the first one is running with the
 same
Consumer Group, won't the second one read the last offsets consumed
 by
   the
already running process and start processing from there? Then as the
   first
process syncs consumed offsets, won't the 2nd process's next update
overwrite them?
   
Thanks,
   
Chris
   
   
   
   
On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote:
   
 Chris,

 Thanks for the writeup. Looks great overall. A couple of comments.

 1. At the beginning, it sounds like that one can't run multiple
   processes
 of consumers in the same group. This is actually not true. We can
   create
 multiple instances of consumers for the same group in the same JVM
 or
 different JVMs. The consumers will auto-balance among themselves.

 2. We have changed the name of some config properties.
 auto.commit.interval.ms is correct. However, zk.connect,
 zk.session.timeout.ms and zk.sync.time.ms are changed to
 zookeeper.connect,
 zookeeper.session.timeout.ms, and zookeeper.sync.time.ms,
   respectively.

 I will add a link to your wiki in our website.

 Thanks again.

 Jun


 On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin 
  curtin.ch...@gmail.com
 wrote:

  Hi Jun,
 
  I finished and published it this morning:
 
 
   
  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 
  One question: when documenting the ConsumerConfig parameters I
   couldn't
  find a description for the 'auto.commit.interval.ms' setting. I
   found
 one
  for 'autocommit.interval.ms' (no '.' between auto and commit) in
  the
  Google
  Cache only. Which spelling is it? Also is my description of it
   correct?
 
  I'll take a look at custom encoders later this week. Today and
   Tuesday
 are
  going to be pretty busy.
 
  Please let me know if there are changes needed to the High Level
Consumer
  page.
 
  Thanks,
 
  Chris
 
 
  On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com
  wrote:
 
   Chris,
  
   Any update of the high level consumer example?
  
   Also, in the Producer example, it would be useful to describe
 how
   to
  write
   a customized encoder. One subtle thing is that the encoder
 needs
  a
   constructor that takes a a single VerifiableProperties
 argument (
   https://issues.apache.org/jira/browse/KAFKA-869).
  
   Thanks,
  
   Jun
  
  
  
  
 

   
  
 



Re: Kafka wiki Documentation conventions - looking for feedback

2013-05-01 Thread Chris Curtin
Hi Jun

I've added #1 and #2.

I'll need to think about where to put #3, maybe even adding a 'tips and
tricks' section?

I've not had to do any encoder/decoders. Can anyone else offer some example
code I can incorporate into an example?

Thanks,

Chris


On Wed, May 1, 2013 at 11:45 AM, Jun Rao jun...@gmail.com wrote:

 Chris,

 Thanks. This is very helpful. I linked your wiki pages to our website. A
 few more comments:

 1. Producer: The details of the meaning of request.required.acks are
 described in http://kafka.apache.org/08/configuration.html. It would be
 great if you can add a link to the description in your wiki.

 2. High level consumer: Could you add the proper way of stopping the
 consumer? One just need to call consumer.shutdown(). After this is called,
 hasNext() call in the Kafka stream iterator will return false.

 3. SimpleConsumer: We have the following api that returns the offset of the
 last message exposed to the consumer. The difference btw high watermark and
 the offset of the last consumed message tells you how many messages the
 consumer is behind the broker.
   highWatermark(topic: String, partition: Int)

 Finally, it would be great if you can extend the wiki with customized
 encoder (Producer) and decoder (Consumer) at some point.
 Thanks,

 Jun


 On Wed, May 1, 2013 at 6:44 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  I've tested my examples with the new (4/30) release and they work, so
 I've
  updated the documentation.
 
  Thanks,
 
  Chris
 
 
  On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote:
 
   Thanks. I also updated your producer example to reflect a recent config
   change (broker.list = metadata.broker.list).
  
   Jun
  
  
   On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Thanks, I missed that the addition of consumers can cause a
 re-balance.
Thought it was only on Leader changes.
   
I've updated the wording in the example.
   
I'll pull down the beta and test my application then change the names
  on
the properties.
   
Thanks,
   
Chris
   
   
On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote:
   
 Basically, every time a consumer joins a group, every consumer in
 the
 groups gets a ZK notification and each of them tries to own a
 subset
  of
the
 total number of partitions. A given partition is only assigned to
 one
   of
 the consumers in the same group. Once the ownership is determined,
  each
 consumer consumes messages coming from its partitions and manages
 the
 offset of those partitions. Since at any given point of time, a
   partition
 is only owned by one consumer, there won't be conflicts on updating
  the
 offsets. More details are described in the consumer rebalancing
algorithm
 section of http://kafka.apache.org/07/design.html

 Thanks,

 Jun


 On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin 
  curtin.ch...@gmail.com
 wrote:

  Jun, can you explain this a little better? I thought when using
Consumer
  Groups that on startup Kafka connects to ZooKeeper and finds the
  last
 read
  offset for every partition in the topic being requested for the
   group.
 That
  is then the starting point for the consumer threads.
 
  If a second process starts while the first one is running with
 the
   same
  Consumer Group, won't the second one read the last offsets
 consumed
   by
 the
  already running process and start processing from there? Then as
  the
 first
  process syncs consumed offsets, won't the 2nd process's next
 update
  overwrite them?
 
  Thanks,
 
  Chris
 
 
 
 
  On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com
  wrote:
 
   Chris,
  
   Thanks for the writeup. Looks great overall. A couple of
  comments.
  
   1. At the beginning, it sounds like that one can't run multiple
 processes
   of consumers in the same group. This is actually not true. We
 can
 create
   multiple instances of consumers for the same group in the same
  JVM
   or
   different JVMs. The consumers will auto-balance among
 themselves.
  
   2. We have changed the name of some config properties.
   auto.commit.interval.ms is correct. However, zk.connect,
   zk.session.timeout.ms and zk.sync.time.ms are changed to
   zookeeper.connect,
   zookeeper.session.timeout.ms, and zookeeper.sync.time.ms,
 respectively.
  
   I will add a link to your wiki in our website.
  
   Thanks again.
  
   Jun
  
  
   On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin 
curtin.ch...@gmail.com
   wrote:
  
Hi Jun,
   
I finished and published it this morning:
   
   
 
   
  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Yammer Metrics not included in 0.8.0?

2013-04-30 Thread Chris Curtin
Hi,

I pulled 0.8.0 head at 3 pm eastern 4/30, did the  sbt update; sbt package;
sbt assembly-package-dependency  and my code won't compile.

It is missing the Yammer Metrics libraries.

Last pull was about 45 days ago and they were stored in core\lib\metrics-*

Do I now need to pull them myself?

Also looks like zkclient is also not being included.

Thanks,

Chris


Re: kafka 0.8 beta release status

2013-04-29 Thread Chris Curtin
Just added the High Level Consumer example.


On Mon, Apr 29, 2013 at 1:52 AM, Jun Rao jun...@gmail.com wrote:

 We have updated the 0.8 documentation in our website (
 http://kafka.apache.org/index.html). Please review the docs. We have the
 following blockers for the 0.8 beta release:

 additional docs:
 * examples of using the 0.8 high level consumer api
 * description of additional 0.8 tools

 KAFKA-885 (sbt package builds two kafka jars)

 It would be great if people can help on the blockers.

 Thanks,

 Jun



Re: Kafka wiki Documentation conventions - looking for feedback

2013-04-29 Thread Chris Curtin
Jun, can you explain this a little better? I thought when using Consumer
Groups that on startup Kafka connects to ZooKeeper and finds the last read
offset for every partition in the topic being requested for the group. That
is then the starting point for the consumer threads.

If a second process starts while the first one is running with the same
Consumer Group, won't the second one read the last offsets consumed by the
already running process and start processing from there? Then as the first
process syncs consumed offsets, won't the 2nd process's next update
overwrite them?

Thanks,

Chris




On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote:

 Chris,

 Thanks for the writeup. Looks great overall. A couple of comments.

 1. At the beginning, it sounds like that one can't run multiple processes
 of consumers in the same group. This is actually not true. We can create
 multiple instances of consumers for the same group in the same JVM or
 different JVMs. The consumers will auto-balance among themselves.

 2. We have changed the name of some config properties.
 auto.commit.interval.ms is correct. However, zk.connect,
 zk.session.timeout.ms and zk.sync.time.ms are changed to
 zookeeper.connect,
 zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively.

 I will add a link to your wiki in our website.

 Thanks again.

 Jun


 On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi Jun,
 
  I finished and published it this morning:
 
  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 
  One question: when documenting the ConsumerConfig parameters I couldn't
  find a description for the 'auto.commit.interval.ms' setting. I found
 one
  for 'autocommit.interval.ms' (no '.' between auto and commit) in the
  Google
  Cache only. Which spelling is it? Also is my description of it correct?
 
  I'll take a look at custom encoders later this week. Today and Tuesday
 are
  going to be pretty busy.
 
  Please let me know if there are changes needed to the High Level Consumer
  page.
 
  Thanks,
 
  Chris
 
 
  On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote:
 
   Chris,
  
   Any update of the high level consumer example?
  
   Also, in the Producer example, it would be useful to describe how to
  write
   a customized encoder. One subtle thing is that the encoder needs a
   constructor that takes a a single VerifiableProperties argument (
   https://issues.apache.org/jira/browse/KAFKA-869).
  
   Thanks,
  
   Jun
  
  
  
  
 



Re: Kafka wiki Documentation conventions - looking for feedback

2013-04-29 Thread Chris Curtin
Thanks, I missed that the addition of consumers can cause a re-balance.
Thought it was only on Leader changes.

I've updated the wording in the example.

I'll pull down the beta and test my application then change the names on
the properties.

Thanks,

Chris


On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote:

 Basically, every time a consumer joins a group, every consumer in the
 groups gets a ZK notification and each of them tries to own a subset of the
 total number of partitions. A given partition is only assigned to one of
 the consumers in the same group. Once the ownership is determined, each
 consumer consumes messages coming from its partitions and manages the
 offset of those partitions. Since at any given point of time, a partition
 is only owned by one consumer, there won't be conflicts on updating the
 offsets. More details are described in the consumer rebalancing algorithm
 section of http://kafka.apache.org/07/design.html

 Thanks,

 Jun


 On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Jun, can you explain this a little better? I thought when using Consumer
  Groups that on startup Kafka connects to ZooKeeper and finds the last
 read
  offset for every partition in the topic being requested for the group.
 That
  is then the starting point for the consumer threads.
 
  If a second process starts while the first one is running with the same
  Consumer Group, won't the second one read the last offsets consumed by
 the
  already running process and start processing from there? Then as the
 first
  process syncs consumed offsets, won't the 2nd process's next update
  overwrite them?
 
  Thanks,
 
  Chris
 
 
 
 
  On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote:
 
   Chris,
  
   Thanks for the writeup. Looks great overall. A couple of comments.
  
   1. At the beginning, it sounds like that one can't run multiple
 processes
   of consumers in the same group. This is actually not true. We can
 create
   multiple instances of consumers for the same group in the same JVM or
   different JVMs. The consumers will auto-balance among themselves.
  
   2. We have changed the name of some config properties.
   auto.commit.interval.ms is correct. However, zk.connect,
   zk.session.timeout.ms and zk.sync.time.ms are changed to
   zookeeper.connect,
   zookeeper.session.timeout.ms, and zookeeper.sync.time.ms,
 respectively.
  
   I will add a link to your wiki in our website.
  
   Thanks again.
  
   Jun
  
  
   On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Hi Jun,
   
I finished and published it this morning:
   
   
  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
   
One question: when documenting the ConsumerConfig parameters I
 couldn't
find a description for the 'auto.commit.interval.ms' setting. I
 found
   one
for 'autocommit.interval.ms' (no '.' between auto and commit) in the
Google
Cache only. Which spelling is it? Also is my description of it
 correct?
   
I'll take a look at custom encoders later this week. Today and
 Tuesday
   are
going to be pretty busy.
   
Please let me know if there are changes needed to the High Level
  Consumer
page.
   
Thanks,
   
Chris
   
   
On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote:
   
 Chris,

 Any update of the high level consumer example?

 Also, in the Producer example, it would be useful to describe how
 to
write
 a customized encoder. One subtle thing is that the encoder needs a
 constructor that takes a a single VerifiableProperties argument (
 https://issues.apache.org/jira/browse/KAFKA-869).

 Thanks,

 Jun




   
  
 



Re: one producer and 2 consumers

2013-04-26 Thread Chris Curtin
In a nutshell: High Level uses Consumer Groups to handle the tracking of
message offset consumption. SimpleConsumer leaves it all up to you.

The 0.7.x quick start shows examples of both:

http://kafka.apache.org/quickstart.html




On Fri, Apr 26, 2013 at 12:32 PM, Oleg Ruchovets oruchov...@gmail.comwrote:

 By the way. What does high level consumer means? Is there other type of
 consumers?

 Thanks
 Oleg.






Re: LeaderNotAvailable Exception

2013-04-24 Thread Chris Curtin
Did you create the topic without a # of partitions then try to
delete/recreate it? I've had that happen to me before. Try shutting down
everything (including zookeeper) and restarting.


On Tue, Apr 23, 2013 at 9:08 PM, Jun Rao jun...@gmail.com wrote:

 Does this happen on every message that you type in producer console?

 Thanks,

 Jun


 On Tue, Apr 23, 2013 at 4:15 PM, Yin Yin yin@outlook.com wrote:

  I tried to run the kafka 0.8 version as instructed in Quick Start. The
  kafka server shows the following message when I launch the producer.
  ERROR
  Error while fetching metadata for partition [test,0]
  (kafka.admin.AdminUtils$)
  kafka.common.LeaderNotAvailableException: No leader exists for partition
 0
  at kafka.admin.AdminUtils$$anonfun$3.apply(AdminUtils.scala:219)
  at kafka.admin.AdminUtils$$anonfun$3.apply(AdminUtils.scala:201)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
  at
 
 scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
  at scala.collection.immutable.List.foreach(List.scala:45)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
  at scala.collection.immutable.List.map(List.scala:45)
  at
 
 kafka.admin.AdminUtils$.kafka$admin$AdminUtils$$fetchTopicMetadataFromZk(AdminUtils.scala:201)
  at
  kafka.admin.AdminUtils$.fetchTopicMetadataFromZk(AdminUtils.scala:190)
  at
 
 kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:479)
  at
 
 kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:465)
  at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
  at
  kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:464)
  at kafka.server.KafkaApis.handle(KafkaApis.scala:67)
  at
  kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
  at java.lang.Thread.run(Unknown Source) Then, when I type a
  message in the producer console, the server console pops out  WARN
  [KafkaApi-0] Leader not local for topic test partition 0 on broker 0
  (kafka.server.KafkaApis) Consumer console also didn't get any message.
 Any
  help is appreciated. Thanks
 



Re: Kafka 0.8 cluster setup?

2013-04-23 Thread Chris Curtin
I following these instructions to get the first 'play' cluster going:

https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html

Instead of running the 3 brokers on the same machine, I ran on on each
machine.

Note that you will need to do a little bit of zookeeper setup to get a
cluster running, I think the main one was setting the 'myid' in the
zookeeper install directory properly on each machine (namely give each
machine a unique # in that file).

Thanks,

Chris


On Mon, Apr 22, 2013 at 5:32 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Hi Jason,

 We are in the process of updating the documentation. Hoping to finish
 it by this week. Stay tuned.

 Thanks,
 Neha

 On Mon, Apr 22, 2013 at 2:12 PM, Jason Huang jason.hu...@icare.com
 wrote:
  Hello,
 
  We've been playing around with kafka 0.8 for a few months now and decided
  to install kafka on a small cluster for further testing. I tried to
 search
  online but couldn't find any setup documentation for a kafka 0.8 cluster.
 
  Does anyone know if such documents exist? If they don't exist, what could
  be our best guide to start with?
 
  thanks!
 
  Jason



Re: Kafka 0.8 cluster setup?

2013-04-23 Thread Chris Curtin
Beat me to it ;)

Only caveat is I wouldn't use /tmp for this, since if you're running
tmpwatch the 'myid' file will get removed unexpectedly since it doesn't
seem to be changed at it. That was fun to find :)

We use /var/zookeeper for our storage.

Thanks,

Chris


On Tue, Apr 23, 2013 at 10:30 AM, Jason Huang jason.hu...@icare.com wrote:

 Thanks Eric - this helps quite a bit.

 I will play around with it.

 Jason

 On Tue, Apr 23, 2013 at 10:21 AM, Eric Sites eric.si...@threattrack.com
 wrote:

  Jason,
 
  You need to modify the ZooKeeper config and add the following:
 
  dataDir=/tmp/zookeeper
 
  initLimit=50
  syncLimit=2
 
  server.1=kafka001.domain.com:2888:3888
  server.2=kafka002.domain.com:2888:3888
 
  server.3=kafka003.domain.com:2888:3888
 
 
 
  # Make sure you open those 2 points on each of the servers for your
  firewall
 
  The .1 after the server is the id of the zookeeper which should be stored
  in a file
  Named /tmp/zookeeper/myid
 
  Echo 1  /tmp/zookeeper/myid`
 
 
  Cheers,
  Eric Sites
 
 
  On 4/23/13 9:50 AM, Jason Huang jason.hu...@icare.com wrote:
 
  Thanks Chris and Neha.
  
  Chris - I've been through the link you mentioned before. However, that
  appears to be using one instance of zookeeper, which makes whichever
  server
  that runs zookeeper as the single point of failure?
  
  Jason
  
  On Tue, Apr 23, 2013 at 8:28 AM, Chris Curtin
  curtin.ch...@gmail.comwrote:
  
   I following these instructions to get the first 'play' cluster going:
  
   https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html
  
   Instead of running the 3 brokers on the same machine, I ran on on each
   machine.
  
   Note that you will need to do a little bit of zookeeper setup to get a
   cluster running, I think the main one was setting the 'myid' in the
   zookeeper install directory properly on each machine (namely give each
   machine a unique # in that file).
  
   Thanks,
  
   Chris
  
  
   On Mon, Apr 22, 2013 at 5:32 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
Hi Jason,
   
We are in the process of updating the documentation. Hoping to
 finish
it by this week. Stay tuned.
   
Thanks,
Neha
   
On Mon, Apr 22, 2013 at 2:12 PM, Jason Huang jason.hu...@icare.com
 
wrote:
 Hello,

 We've been playing around with kafka 0.8 for a few months now and
   decided
 to install kafka on a small cluster for further testing. I tried
 to
search
 online but couldn't find any setup documentation for a kafka 0.8
   cluster.

 Does anyone know if such documents exist? If they don't exist,
 what
   could
 be our best guide to start with?

 thanks!

 Jason
   
  
 
 



Re: Securing Kafka

2013-04-23 Thread Chris Curtin
Also keep in mind that anything done at the transport (SSL for example)
layer won't solve your 'at rest' problems.

All messages are written to disk, so unless the broker does some encryption
logic you haven't solved the data visibility issues.

I also think this should be a producer/consumer problem not a Broker. Keep
the Brokers as fast as possible (thus NIO/kernel space activities etc.) and
push the cost to the producers and consumers.

Chris


On Tue, Apr 23, 2013 at 2:02 PM, Jason Rosenberg j...@squareup.com wrote:

 Yes,

 I think encryption at the message level is a workable solution, as long as
 you don't care about exposing the meta data that goes with it (e.g. topic
 names, kafka broker/zk server locations, etc.).

 Jason


 On Tue, Apr 23, 2013 at 10:02 AM, Fergal Somers
 fergal.som...@workday.comwrote:

  Hi
 
  We are planning to use Kafka, but like others on this list we have a need
  to be able to secure communication. The approaches people have suggested
 on
  this list are:
 
 - Encrypt the messages at the producer (e.g
 
 
 http://search-hadoop.com/m/1AfXKcZIk52/message+encryptionsubj=Re+Secure+communication
 )
 - Add SSL to Kafka protocol -
 
 
 http://mail-archives.apache.org/mod_mbox/kafka-users/201304.mbox/ajax/%3CCAA%2BBczQ_dMXUTNndSu4d%2B6aRo%3DSLiFa4iGMu_78OWKub_CTScw%40mail.gmail.com%3E
 
  Adding SSL support to Kafka, probably means adding SSLEngine support the
  the nio socket handling (
  https://groups.google.com/forum/#!msg/kafka-dev/ZmJrB_plu1I/_9cmGlLCSVEJ
 ).
  I don't think there are any immediate plans to provide this, but it's
  potentially something that Kafka would support in the future?
 
  In theory this is something we could look at, but we would need to go
  further. We also need to separate producers from consumers. The aim would
  be to ensure that a Kafka producer couldn't also act as a consumer.
  Essentially producers can write to Kafka, but not read.
 
  From looking at the Kafka source, achieving producer/consumer separation
  looks to me like it would be quite a change to the Kafka server (0.7). So
  are there any plans in the (near) future in this area (producer /
 consumer
  separation) ?
 
  Message encryption (at the application layer) would allow us to achieve
  both aims of securing communication and separating consumers from
  producers. Producers get the public cert (so they can encrypt messages as
  they place them on the bus). Only consumers get the private cert - so
 only
  they can decrypt messages consumed. This seems like something we can do
  ourselves - I just wanted to sanity check the approach with this group.
 
  Cheers,
 
  Fergal.
 



Re: Kafka wiki Documentation conventions - looking for feedback

2013-04-22 Thread Chris Curtin
Hi Jun,

#1 and #2 are done, thanks for the code-review!

I'll work on getting a High Level consumer example this week. I don't have
one readily usable (we quickly found the lack of control over offsets
didn't meet our needs) but I can get something this week.

Congratulations on getting closer to Beta!

Chris


On Mon, Apr 22, 2013 at 12:26 PM, Jun Rao jun...@gmail.com wrote:

 Chris,

 Thanks for the wiki. We are getting close to releasing 0.8.0 beta and your
 writeup is very helpful. The following are some comments for the 0.8
 Producer wiki.

 1. The following sentence is inaccurate. The producer will do random
 assignment as long as the key in KeyedMessage is null. If a key is not
 null, it will use the default partitioner if partitioner.class is not set.
 By default if you don't include a partitioner.class Kafka will randomly
 assign the message to a partition.

 2. In the following sentence, the first type is the key and the second type
 is the message.
 The first is the type of the message, the second the type of the Partition
 key.

 3. Could you explain the key.serializer.class property too?

 In addition to the 0.8 SimpleConsumer wiki, could you write up one for the
 0.8 high level consumer?

 Thanks,

 Jun



 On Fri, Mar 29, 2013 at 8:28 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi,
 
  I've added an example program for using a SimpleConsumer for 0.8.0. Turns
  out to be a little more complicated once you add Broker failover. I'm not
  100% thrilled with how I detect and recover, so if someone has a better
 way
  of doing this please let me (and this list) know.
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 
  Thanks,
 
  Chris
 
 
  On Mon, Mar 25, 2013 at 8:02 AM, Chris Curtin curtin.ch...@gmail.com
  wrote:
 
  
   Hi David,
  
   Thanks for the feedback. I've seen the example before and after in
   different books/articles and it doesn't matter to me.
  
   Anyone else want to help define a style guide or is there one I didn't
  see
   already?
  
   Thanks,
  
   Chris
  
  
   On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com
 wrote:
  
   This looks great! A few comments
  
   * I think it would be useful to start with a complete example (ready
 to
   copy/paste) and then break it down bit by bit
   * Some of the formatting is funky (gratuitous newlines), also I think
 2
   spaces looks nicer than 4
   * In the text, it might be useful to embolden or italicize class names
  
   Also, maybe we should move this to a separate thread?
  
  
  
 



Re: Got exception executing Kafka Producer.

2013-04-04 Thread Chris Curtin
You need to reference the version of Yammer shipping with Kafka. It is
under \core\lib\metrics-*


On Thu, Apr 4, 2013 at 11:41 AM, Oleg Ruchovets oruchov...@gmail.comwrote:

 I am executing a simple code like this:

 public class FirstKafkaTester {
 public ProducerInteger, String initProducer(){
 Properties props = new Properties();
 props.put(zk.connect, 127.0.0.1:2181);
 props.put(broker.list, localhost:9092);
 props.put(serializer.class, kafka.serializer.StringEncoder);
 return new ProducerInteger, String(new ProducerConfig(props));
 }

 public void executeProducer(String topic){
 int messageNo = 1;
 ProducerInteger, String producer = initProducer();
 String messageStr = new String(Message_ + messageNo);
 producer.send(new KeyedMessageInteger, String(topic,
 messageStr));
 }
 public static void main(String[] args) {
 FirstKafkaTester firstKafkaTester = new FirstKafkaTester();
 firstKafkaTester.executeProducer(topic_1);
 }
 }
 I use Kafka 0.8. I put manually to the maven in my local repository and add
 dependency of yammer

 dependency
 groupIdcom.yammer.metrics/groupId
 artifactIdmetrics-core/artifactId
 version2.2.0/version
 /dependency

 I got this exception:

 2013-04-04 11:09:56,909 WARN  async.DefaultEventHandler
 (Logging.scala:warn(89)) - Failed to send producer request with correlation
 id 2 to broker 3 with data for partitions [topic_1,0]
 java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36)
  at kafka.producer.SyncProducer.send(SyncProducer.scala:100)
  at

 kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:243)
  at

 kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:105)
  at

 kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:99)
  at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
  at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
  at scala.collection.Iterator$class.foreach(Iterator.scala:631)
  at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
  at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
  at

 kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
  at

 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
  at kafka.producer.Producer.send(Producer.scala:74)
  at kafka.javaapi.producer.Producer.send(Producer.scala:32)
  at kafka_eval.FirstKafkaTester.executeProducer(FirstKafkaTester.java:26)
  at kafka_eval.FirstKafkaTester.main(FirstKafkaTester.java:34)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  at java.lang.reflect.Method.invoke(Method.java:597)
  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

 Is this something wrong that I am doing?
 What should I do to resolve the issue.

 Thanks
 Oleg.



Re: Slides from my March 2013 Atlanta Java User's Group presentation about Kafka

2013-04-01 Thread Chris Curtin
Now with Video:

http://vimeo.com/63040812

(I did notice that I misspoke about reading from replicas, sorry).


On Wed, Mar 20, 2013 at 8:11 AM, Chris Curtin curtin.ch...@gmail.comwrote:

 Hi,

 It went really well last night. Lots of good questions. Here are the
 slides, and hopefully the video will be up in  a few days:

 http://www.slideshare.net/chriscurtin/ajug-march-2013-kafka

 Thanks,

 Chris



Re: Kafka wiki Documentation conventions - looking for feedback

2013-03-29 Thread Chris Curtin
Hi,

I've added an example program for using a SimpleConsumer for 0.8.0. Turns
out to be a little more complicated once you add Broker failover. I'm not
100% thrilled with how I detect and recover, so if someone has a better way
of doing this please let me (and this list) know.

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

Thanks,

Chris


On Mon, Mar 25, 2013 at 8:02 AM, Chris Curtin curtin.ch...@gmail.comwrote:


 Hi David,

 Thanks for the feedback. I've seen the example before and after in
 different books/articles and it doesn't matter to me.

 Anyone else want to help define a style guide or is there one I didn't see
 already?

 Thanks,

 Chris


 On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com wrote:

 This looks great! A few comments

 * I think it would be useful to start with a complete example (ready to
 copy/paste) and then break it down bit by bit
 * Some of the formatting is funky (gratuitous newlines), also I think 2
 spaces looks nicer than 4
 * In the text, it might be useful to embolden or italicize class names

 Also, maybe we should move this to a separate thread?





Re: Anyone working on a Kafka book?

2013-03-25 Thread Chris Curtin
Thanks for finding those. Looks like a copy and paste issue. I've updated
the document.

Thanks,

Chris


On Sat, Mar 23, 2013 at 11:27 AM, Jonathan Hodges hodg...@gmail.com wrote:

 Many thanks for contributing!  The docs are very helpful.  I found a couple
 small possible typos.  The partitioning code example looks like it repeats
 at the bottom with duplicate import and class definition statements.  Also
 the create topic command-line appears to have an extra '-' for the
 partition option.  I can edit these, but before doing so I wanted to check
 and make sure I wasn't mistaken.

 -Jonathan







Kafka wiki Documentation conventions - looking for feedback

2013-03-25 Thread Chris Curtin
Hi David,

Thanks for the feedback. I've seen the example before and after in
different books/articles and it doesn't matter to me.

Anyone else want to help define a style guide or is there one I didn't see
already?

Thanks,

Chris


On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com wrote:

 This looks great! A few comments

 * I think it would be useful to start with a complete example (ready to
 copy/paste) and then break it down bit by bit
 * Some of the formatting is funky (gratuitous newlines), also I think 2
 spaces looks nicer than 4
 * In the text, it might be useful to embolden or italicize class names

 Also, maybe we should move this to a separate thread?



Re: Anyone working on a Kafka book?

2013-03-20 Thread Chris Curtin
Okay, how do we do this logistically? I've take the Producer code that I
wrote for testing purposes and wrote a description around it. How do I get
it to you guys?

Simple Consumer is going to take a little longer since my test Consumers
are non-trivial and I'll need to simplify them.

Thanks,

Chris


On Tue, Mar 19, 2013 at 5:28 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 That's a great idea, Chris! How about picking the quickstart document ?That
 is the most important information that users moving to 0.8 will need.

 Thanks,
 Neha






Re: Anyone working on a Kafka book?

2013-03-19 Thread Chris Curtin
Hi Jun,

I've been thinking for a while about how to contribute to the project and
thought that working on some documentation for the website might be a good
way. Do you have an outline of what you'd like the site to look like that I
(AND OTHERS hint, hint) could pick a topic, write the article and submit
for you guys to review?


Thanks,

Chris


On Tue, Mar 19, 2013 at 12:20 PM, Jun Rao jun...@gmail.com wrote:

 Hi, David,

 At LinkedIn, committers are too busy to write a Kafka book right now. I
 think this is a good idea to pursue. So, if you want to do it, we'd be
 happy to help. The only request that I have for you is while writing the
 book, it would be good if you can use this opportunity to also help us
 improve the documentation of the site.

 Thanks,

 Jun

 On Tue, Mar 19, 2013 at 6:34 AM, David Arthur mum...@gmail.com wrote:

  I was approached by a publisher the other day to do a book on Kafka -
  something I've actually thought about pursuing. Before I say yes (or
  consider saying yes), I wanted to make sure no one else was working on a
  book. No sense in producing competing texts at this point.
 
  So, anyone working on a Kafka book? Self published or otherwise?
 
  -David
 
 
 



Re: 0.8 behavior change: consumer re-receives last batch of messages in a topic?

2013-03-13 Thread Chris Curtin
Hi,

I noticed the same thing. In 0.8.0 the offset passed to the fetch is where
you want to start, not where you left off. So the last offset read from the
previous batch is truly the 'last offset' so you need to save it and ask
for it +1. Otherwise you keep asking for that last offset, which is valid
so it keeps returning.

Be careful with the +1 logic. Don't keep adding 1 if you don't get
anything. It should always be 'last offset read +1'

I think this happened with the change from file-byte offsets to offset as a
message #.

Chris


On Wed, Mar 13, 2013 at 2:49 PM, Hargett, Phil 
phil.harg...@mirror-image.com wrote:

 I have 2 consumers in our scenario, reading from different brokers. Each
 broker is running standalone, although each have their own dedicated
 zookeeper instance for bookkeeping.

 After switching from 0.7.2, I noticed that both consumers exhibited high
 CPU usage. I am not yet exploiting any zookeeper knowledge in my consumer
 code; I am just making calls to the SimpleConsumer in the java API, passing
 the host and port of my broker.

 In 0.7.2, I kept the last offset from messages received via a fetch, and
 used that as the offset passed into the fetch method when receiving the
 next message set.

 With 0.8, I had to add a check to drop fetched messages when the message's
 offset was less than my own offset, based on the last message I saw. If I
 didn't make that change, it seemed like the last 200 or so messages in my
 topic  (probably matches a magic batch size configured somewhere in all of
 this code) were continually refetched.

 In this scenario, my topic was no longer accumulating messages, as I had
 turned off the producer, so I was expecting the fetches to eventually
 either block, return an empty message set, or fail (not sure of semantics
 of fetch). Continually receiving the last batch of messages at the end of
 the topic was not a semantic I expected.

 Is this an intended change in behavior—or do I need to write better
 consumer code?

 Guidance, please.

 :)


SimpleConsumer error conditions and handling

2013-03-05 Thread Chris Curtin
Hi,

0.8.0 HEAD from 3/4/2013.

As I think through building a robust SimpleConsumer I ran some failure
tests today and want to make sure I understand what is going on.

FYI I know that I should be doing a metadata lookup to find the leader, but
I wanted to see what happens if things are going well and the leader
changes between requests or I've cached the leader and try to connect
without the cost of a leader lookup.

First test: connect to a Broker that is a 'copy' of the topic/partition but
not leader. Get an error '5' which maps to
'ErrorMapping.LeaderNotAvailableCode'.

Why didn't I get ErrorMapping.NotLeaderForPartitionCode or something else
to tell me I'm not talking to the Leader? 'not available' implies something
is wrong with replication. But connecting to the leader Broker everything
works fine.

Second test: connect to a Broker that isn't the leader or a copy and I get
error 3, unknown topic or partition. Makes sense.

Third test: connect to the leader and while reading data, shutdown the
leader Broker via command line: I get some IOExceptions then Connection
Refused on the reconnect. (Note that the Connect Refused is the exception
raised, IOException was written to logs but not raised to my code.)

Not sure the best way to code to recover from this without assuming the
worst every time  Could there be some notice from Kafka that the connection
to the leader was closed due to a shutdown vs. getting Connection Refused
errors so I can respond differently? Something like 'Broker has closed
connection due to shutdown'. So I know to sleep for a second before going
through the leader lookup logic again? Or ideally have Kafka know it was a
clean shutdown and automatically transition to the new leader.

Knowing it was a clean shutdown would also allow me to treat the clean
shutdown as a normal occurrence vs. an exception when something goes wrong.

Thanks,

Chris


Re: 0.8.0 HEAD 3/4/2013 performance jump?

2013-03-05 Thread Chris Curtin
Great points Joe.

What about something being written to INFO at startup about the replication
level being used?

Chris


On Tue, Mar 5, 2013 at 9:36 AM, Joe Stein crypt...@gmail.com wrote:

 Hi Chris, setting the ack default to 1 would mean folks would have to have
 a replica setup and configured otherwise starting a server from scratch
 from download would mean an error message to the user.   I hear your risk
 of not replicating though perhaps such a use case would be solved through
 auto discovery or some other feature/contribution for 0.9.

 I would be -1 on changing the default right now because new folks coming in
 on a build either as new or migrations simply leaving because they got an
 error or even running by just git clone ./sbt package and running (less
 steps in 0.8).  There are already expectations on 0.8 we should try to keep
 things settling too.

 Lastly, folks when they run and go live often will have a chef, cfengine,
 puppet, etc script for configuration

 Perhaps through some more operation documentation, comments and general
 communications to the community we can reduce risk.

 /*
 Joe Stein
 http://www.linkedin.com/in/charmalloc
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 */

 On Tue, Mar 5, 2013 at 8:30 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi Jun,
 
  I wasn't explicitly setting the ack anywhere.
 
  Am I reading the code correctly that in SyncProducerConfig.scala the
  DefaultRequiredAcks is 0? Thus not waiting on the leader?
 
  Setting:  props.put(request.required.acks, 1); causes the writes to
 go
  back to the performance I was seeing before yesterday.
 
  Are you guys open to changing the default to be 1? The MongoDB
 Java-driver
  guys made a similar default change at the end of last year because many
  people didn't understand the risk that the default value of no-ack was
  putting them in until they had a node failure. So they default to 'safe'
  and let you decide what your risk level is vs. assuming you can lose
 data.
 
  Thanks,
 
  Chris
 
 
 
  On Tue, Mar 5, 2013 at 1:00 AM, Jun Rao jun...@gmail.com wrote:
 
   Chris,
  
   On the producer side, are you using ack=0? Earlier, ack=0 is the same
 as
   ack=1, which means that the producer has to wait for the message to be
   received by the leader. More recently, we did the actual implementation
  of
   ack=0, which means the producer doesn't wait for the message to reach
 the
   leader and therefore it is much faster.
  
   Thanks,
  
   Jun
  
   On Mon, Mar 4, 2013 at 12:01 PM, Chris Curtin curtin.ch...@gmail.com
   wrote:
  
Hi,
   
I'm definitely not complaining, but after upgrading to HEAD today my
producers are running much, much faster.
   
Don't have any measurements, but last release I was able to tab
 windows
   to
stop a Broker before I could generate 500 partitioned messages. Now
 it
completes before I can get the Broker shutdown!
   
Anything in particular you guys fixed?
   
(I did remove all the files on disk per the email thread last week
 and
reset the ZooKeeper meta, but that shouldn't matter right?)
   
Very impressive!
   
Thanks,
   
Chris
   
  
 



Re: Copy availability when broker goes down?

2013-03-04 Thread Chris Curtin
I'll grab HEAD in a few minutes and see if the changes.

Issues submitted:

https://issues.apache.org/jira/browse/KAFKA-783

https://issues.apache.org/jira/browse/KAFKA-782

Thanks,

Chris


On Mon, Mar 4, 2013 at 1:15 PM, Jun Rao jun...@gmail.com wrote:

 Chris,

 As Neha said, the 1st copy of a partition is the preferred replica and we
 try to spread them evenly across the brokers. When a broker is restarted,
 we don't automatically move the leader back to the preferred replica
 though. You will have to run a command line
 tool PreferredReplicaLeaderElectionCommand to balance the leaders again.

 Also, I recommend that you try the latest code in 0.8. A bunch of issues
 have been fixes since Jan. You will have to wipe out all your ZK and Kafka
 data first though.

 Thanks,

 Jun

 On Mon, Mar 4, 2013 at 8:32 AM, Chris Curtin curtin.ch...@gmail.com
 wrote:

  Hi,
 
  (Hmm, take 2. Apache's spam filter doesn't like the word to describe the
  copy of the data. 'R - E -P -L -I -C -A' so it blocked it from sending!
  Using 'copy' below to mean that concept)
 
  I’m running 0.8.0 with HEAD from end of January (not the merge you guys
 did
  last night).
 
  I’m testing how the producer responds to loss of brokers, what errors are
  produced etc. and noticed some strange things as I shutdown servers in my
  cluster.
 
  Setup:
  4 node cluster
  1 topic, 3 copies in the set
  10 partitions numbered 0-9
 
  State of the cluster is determined using TopicMetadataRequest.
 
  When I start with a full cluster (2nd column is the partition id, next is
  leader, then the copy set and ISR):
 
  Java: 0:vrd03.atlnp1 R:[  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1]
  Java: 1:vrd04.atlnp1 R:[  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[
  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1]
  Java: 2:vrd03.atlnp1 R:[  vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[
  vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1]
  Java: 3:vrd03.atlnp1 R:[  vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 4:vrd03.atlnp1 R:[  vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[
  vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1]
  Java: 5:vrd03.atlnp1 R:[  vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 6:vrd03.atlnp1 R:[  vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1]
  Java: 7:vrd04.atlnp1 R:[  vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[
  vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1]
  Java: 8:vrd03.atlnp1 R:[  vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 9:vrd03.atlnp1 R:[  vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1]
 
  When I stop vrd01, which isn’t leader on any:
 
  Java: 0:vrd03.atlnp1 R:[ ] I:[]
  Java: 1:vrd04.atlnp1 R:[ ] I:[]
  Java: 2:vrd03.atlnp1 R:[ ] I:[]
  Java: 3:vrd03.atlnp1 R:[  vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 4:vrd03.atlnp1 R:[ ] I:[]
  Java: 5:vrd03.atlnp1 R:[  vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 6:vrd03.atlnp1 R:[ ] I:[]
  Java: 7:vrd04.atlnp1 R:[ ] I:[]
  Java: 8:vrd03.atlnp1 R:[  vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[
  vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1]
  Java: 9:vrd03.atlnp1 R:[ ] I:[]
 
  Does this mean that none of the partitions that used to have a copy on
  vrd01 are updating ANY of the copies?
 
  I ran another test, again starting with a full cluster and all partitions
  had a full set of copies. When I stop the broker which was leader for 9
 of
  the 10 partitions, the leaders were all elected on one machine instead of
  the set of 3. Should the leaders have been better spread out? Also the
  copies weren’t fully populated either.
 
  Last test: started with a full cluster, showing all copies available.
  Stopped a broker that was not a leader for any partition. Noticed that
 the
  partitions where the stopped machine was in the copy set didn’t show any
  copies like above. Let the cluster sit for 30 minutes and didn’t see any
  new copies being brought on line. How should the cluster handle a machine
  that is down for an extended period of time?
 
  I don’t have a new machine I could add to the cluster, but what happens
  when I do? Will it not be used until a new topic is added or how does it
  become a valid option for a copy or eventually the leader?
 
  Thanks,
 
  Chris
 



Re: Consumer questions: 0.8.0 vs. 0.7.2

2012-12-03 Thread Chris Curtin
Hi,

I was able to implement my own lookup code but have a few concerns about
this long term:
- the Broker class is marked as 'private' in the Scala code. IntelliJ gives
me an error about using it, but the runtime lets me use it and get the
host/port out.
- I have to know a lot about the structure of some internal classes for
this to work, so changes in the implementation would cause my logic to
break.

I did a quick JIRA search and didn't see a request for a java-api for
finding the primary, is that already on the roadmap or should I submit
an enhancement request?

Thanks,

Chris


On Wed, Nov 28, 2012 at 2:08 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 snip

 Also, there are 2 ways to send the topic metadata request. One way is
 how SimpleConsumerShell.scala uses ClientUtils.fetchTopicMetadata().
 Another way is by using the send() API on SyncProducer.

 Thanks,
 Neha