I have to pass in a Map from the topic -> #streams, according to the
scaladoc at http://incubator.apache.org/kafka/api-docs/0.6/.
Is this the same or different than the # of partitions? For example, let's
say that I have 10 partitions for a topic. What partitions will the
following code fetch from?
val consumerConnector = Consumer.create(new ConsumerConfig(props))
val topicMessageStreams =
consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
val kafkaStream = topicMessageStreams(topic)(0)
override def run = try {
for (message <- kafkaStream) {
Is there any way I can get the code above to read from all 10 partitions?
Or do I really have to create 10 separate threads for reading from 10
partitions at once?
Does each KafkaMessageStream need its own thread, or can they be shared
somehow?
Let's say I want to be able to attach additional consumers dynamically to
read from the same topic, such that the consumers get the messages
round-robin.
It seems we have the following constraints:
- The # of partitions must be >= the max # of consumers I would want to
attach, since partitions cannot be divided amongst multiple consumers
- Each consumer must be able to consume up to ceil(# partitions / minimum #
consumers) partitions or streams (still confused). If the consumers
don't grab all of the streams/partitions, then some partitions will not be
allocated and no messages would be read from those partitions.
Thanks for clarifying confusion.
--
--
*Evan Chan*
Senior Software Engineer |
[email protected] | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>