Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread David Corley
If the 100 partitions are all for the same topic, you can have up to 100
consumers working as part of a single consumer group for that topic.
You cannot have more consumers than there are partitions within a given
consumer group.

On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote:

 Hi,

 I was wondering what options there are for horizontally scaling kafka
 consumers? Basically if I have 100 partitions and 10 consumers, and want to
 temporarily scale up to 50 consumers, what options do I have?

 So far I've thought of just simply tracking consumer membership somehow
 (either through Raft or zookeeper's znodes) on the consumers.



Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Stevo Slavić
Please correct me if wrong, but I think it is really not hard constraint
that one cannot have more consumers (from same group) than partitions on
single topic - all the surplus consumers will not be assigned to consume
any partition, but they can be there and as soon as one active consumer
from same group goes offline (its connection to ZK is dropped), consumers
from the group will be rebalanced so one passively waiting consumer will
become active.

Kind regards,
Stevo Slavic.

On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com wrote:

 If the 100 partitions are all for the same topic, you can have up to 100
 consumers working as part of a single consumer group for that topic.
 You cannot have more consumers than there are partitions within a given
 consumer group.

 On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com wrote:

  Hi,
 
  I was wondering what options there are for horizontally scaling kafka
  consumers? Basically if I have 100 partitions and 10 consumers, and want
 to
  temporarily scale up to 50 consumers, what options do I have?
 
  So far I've thought of just simply tracking consumer membership somehow
  (either through Raft or zookeeper's znodes) on the consumers.
 



Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread David Corley
You're right Stevo, I should re-phrase to say that there can be no more
_active_ consumers than there are partitions (within a single consumer
group).
I'm guessing that's what Nimi is alluding to asking, but perhaps he can
elaborate on whether he's using consumer groups and/or whether the 100
partitions are all for a single topic, or multiple topics.

On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote:

 Please correct me if wrong, but I think it is really not hard constraint
 that one cannot have more consumers (from same group) than partitions on
 single topic - all the surplus consumers will not be assigned to consume
 any partition, but they can be there and as soon as one active consumer
 from same group goes offline (its connection to ZK is dropped), consumers
 from the group will be rebalanced so one passively waiting consumer will
 become active.

 Kind regards,
 Stevo Slavic.

 On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com
 wrote:

  If the 100 partitions are all for the same topic, you can have up to 100
  consumers working as part of a single consumer group for that topic.
  You cannot have more consumers than there are partitions within a given
  consumer group.
 
  On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com
 wrote:
 
   Hi,
  
   I was wondering what options there are for horizontally scaling kafka
   consumers? Basically if I have 100 partitions and 10 consumers, and
 want
  to
   temporarily scale up to 50 consumers, what options do I have?
  
   So far I've thought of just simply tracking consumer membership somehow
   (either through Raft or zookeeper's znodes) on the consumers.
  
 



Horizontally Scaling Kafka Consumers

2015-04-29 Thread Nimi Wariboko Jr
Hi,

I was wondering what options there are for horizontally scaling kafka
consumers? Basically if I have 100 partitions and 10 consumers, and want to
temporarily scale up to 50 consumers, what options do I have?

So far I've thought of just simply tracking consumer membership somehow
(either through Raft or zookeeper's znodes) on the consumers.


Re: Why fetching meta-data for topic is done three times?

2015-04-29 Thread Madhukar Bharti
Hi Zakee,

message.send.max.retries is 1

Regards,
Madhukar

On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti bhartimadhu...@gmail.com
wrote:

 Hi Zakee,

 Thanks for your reply.

 message.send.max.retries
 3

 retry.backoff.ms
 100

 topic.metadata.refresh.interval.ms
 600*1000

 This is my properties.

 Regards,
 Madhukar

 On Tue, Apr 28, 2015 at 3:26 AM, Zakee kzak...@netzero.net wrote:

 What values do you have for below properties? Or are these set to
 defaults?

 message.send.max.retries
 retry.backoff.ms
 topic.metadata.refresh.interval.ms

 Thanks
 Zakee



  On Apr 23, 2015, at 11:48 PM, Madhukar Bharti bhartimadhu...@gmail.com
 wrote:
 
  Hi All,
 
  Once gone through code found that, While Producer starts it does three
  things:
 
  1. Sends Meta-data request
  2. Send message to broker(fetching broker list)
  3. If number of message to be produce is grater than 0 then again tries
 to
  refresh metadata for outstanding produce requests.
 
  Each of the request takes configured timeout and go to next and finally
  once all is done then it will throw Exception(if 3 also fails).
 
  Here the problem is, if we set timeout as 1 sec then to throw an
 exception
  It takes 3 sec, so user request will be hanged up till 3 sec, that is
  comparatively high for response time and if all threads will be blocked
 due
  to producer send then whole application will be blocked for 3 sec. So we
  want to reduce this time to 1 sec. in overall to throw Exception.
 
  What is the possible way to do this?
 
  Thanks
  Madhukar
 
  On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti 
 bhartimadhu...@gmail.com
  wrote:
 
  Hi All,
 
  I came across a problem, If we use broker IP which is not reachable or
 out
  of network. Then it takes more than configured time(request.timeout.ms
 ).
  After checking the log got to know that it is trying to fetch topic
  meta-data three times by changing correlation id.
  Due to this even though i keep (request.timeout.ms=1000) It takes 3
 sec
  to throw Exception. I am using Kafka0.8.1.1 with patch
 
 https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
 
 
  I have attached the log. Please check this and clarify why it is
 behaving
  like this. Whether it is by design or have to set some other property
 also.
 
 
 
  Regards
  Madhukar
 
 
 
  
  Want to place your ad here?
  Advertise on United Online
 
 http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc






Re: zookeeper restart fatal error

2015-04-29 Thread David Corley
Unfortunately sounds like a Zookeeper data corruption issue on the node in
question:
https://issues.apache.org/jira/browse/ZOOKEEPER-1546

The fix from the Jira is to clean out the Zookeeper data on the affected
node (if that's possible)

On 28 April 2015 at 16:59, Emley, Andrew andrew.em...@eon.com wrote:

 Hi

 I have had zk and kafka(2_8.0-0.8.1) set up nicely running for a week or
 so, I decided to stop the zk and the kafka brokers and re-start them, since
 stopping zk I can't start it again!! It gives me this fatal exception that
 is related to one of my test topics multinode1partition4reptopic!?

 Can anyone give me any pointers on how to resolve?

 Many thanks
 andy


 [2015-04-28 16:31:05,282] ERROR Failed to increment parent cversion for:
 /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
 org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
 NoNode for
 /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
 at
 org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
 at
 org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
 at
 org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
 at
 org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
 at
 org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239)
 at
 org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366)
 at
 org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51)
 at
 org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108)
 at
 org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
 [2015-04-28 16:31:05,289] FATAL Unexpected exception, exiting abnormally
 (org.apache.zookeeper.server.ZooKeeperServerMain)
 java.io.IOException: Failed to process transaction type: 2 error:
 KeeperErrorCode = NoNode for
 /consumers/console-consumer-22432/offsets/multinode1partition4reptopic
 at
 org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
 at
 org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
 at
 org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:239)
 at
 org.apache.zookeeper.server.ZooKeeperServer.startdata(ZooKeeperServer.java:366)
 at
 org.apache.zookeeper.server.NIOServerCnxn$Factory.startup(NIOServerCnxn.java:160)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:110)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:85)
 at
 org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:51)
 at
 org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:108)
 at
 org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)







Re: Kafka client - 0.9

2015-04-29 Thread Gwen Shapira
In current high-level consumer, you can still manually control when you
commit offsets (see this blog for details:
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
)

While you can't explicitly roll-back a commit, you can simply avoid
committing when you have an exception (its a tiny bit more complex than
that - because the iterator stores a buffer and location in the buffer...
so you need to maintain your own collection of events to retry this is
also explained in the blog above).

Hope this helps.

Gwen

On Wed, Apr 29, 2015 at 9:50 AM, Bharath Srinivasan bharath...@gmail.com
wrote:

 Any pointers on this feature?

 Thanks.

 On Thu, Apr 23, 2015 at 9:57 PM, Bharath Srinivasan bharath...@gmail.com
 wrote:

  Thanks Gwen.
 
  I'm specifically looking for the consumer rewrite API (
  org.apache.kafka.clients.consumer.KafkaConsumer). Based on the wiki, this
  feature is available only in 0.9.
 
  The specific use case is that, I wanted to use the high level consumer
 but
  with the ability to rollback the offset in case of any exceptions. Based
 on
  the documentation, it seems like the current high level consumer API does
  not seem to be supporting it, atleast not in a straight forward fashion.
 
  Appreciate any alternate solutions.
 
  On Thu, Apr 23, 2015 at 8:08 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  We don't normally plan dates for releases, when we are done with
  features we want in the next release and happy with quality, we
  release. Many Apache communities are like that.
 
  If you need firmer roadmaps and specific release dates, there are few
  vendors selling Kafka distributions and support.
 
  Are there any specific features you are waiting for?
 
  Gwen
 
  On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan
  bharath...@gmail.com wrote:
   Hi,
  
   I'm looking for the 0.9 client release plan.
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
  
   Is there a planned date for this release?
  
   Thanks,
   Bharath
 
 
 



Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.99021500035|40.6636611

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:

 example.shutdown(); in ConsumerGroupExample closes all consumer connections
 to Kafka. remove this line the consumer threads will run forever

 On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
 wrote:

  Hi All,
 
  I am trying to get a multi threaded HL consumer working against a 2
 broker
  Kafka cluster with a 4 partition 2 replica  topic.
 
  The consumer code is set to run with 4 threads, one for each partition.
 
  The producer code uses the default partitioner and loops indefinitely
  feeding events into the topic.(I excluded the while loop in the paste
  below)
 
  What I see is the threads eventually all exit, even thought the producer
 is
  still sending events into the topic.
 
  My understanding is that the consumer thread per partition is the correct
  setup.
 
  Any ideas why this code doesn't continue to consume events at they are
  pushed to the topic?
 
  I suspect I am configuring something wrong here, but am not sure what.
 
  Thanks,
 
  Chris
 
 
  *T**opic Configuration*
 
  Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
 
  Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
 
  Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
 
  Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
 
   Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
 
  *Producer Code:*
 
   Properties props = new Properties();
 
  props.put(metadata.broker.list, args[0]);
 
  props.put(zk.connect, args[1]);
 
  props.put(serializer.class, kafka.serializer.StringEncoder);
 
  props.put(request.required.acks, 1);
 
  String TOPIC = args[2];
 
  ProducerConfig config = new ProducerConfig(props);
 
  ProducerString, String producer = new ProducerString, String(
  config);
 
  finalEvent = new Timestamp(new Date().getTime()) + |
 
  + truckIds[0] + | + driverIds[0] + | +
  events[random
  .nextInt(evtCnt)]
 
  + | + getLatLong(arrayroute17[i]);
 
  try {
 
  KeyedMessageString, String data = new
  KeyedMessageString, String(TOPIC, finalEvent);
 
  LOG.info(Sending Messge #:  + routeName[0] + :  + i
 +,
  msg: + finalEvent);
 
  producer.send(data);
 
  Thread.sleep(1000);
 
  } catch (Exception e) {
 
  e.printStackTrace();
 
  }
 
 
  *Consumer Code:*
 
  public class ConsumerTest implements Runnable {
 
 private KafkaStream m_stream;
 
 private int m_threadNumber;
 
 public 

Consuming keyed messages with null value

2015-04-29 Thread Warren Henning
I have an application producing Avro-encoded keyed messages (Martin
Kleppmann's new Bottled Water project).

It encodes a delete as a keyed message with an id as a key and a null
payload. I have log compaction turned on.

The Avro console consumer correctly displays this as null in my terminal,
but when I try to consume it using the high-level consumer in Java, the
message is never consumed. Subsequent non-null messages that were produced
after that null also aren't consumed.

Do I need to do something in order to have the iterator's hasNext() method
(my code is pretty much exactly what appears in
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example )
not treat a null value as not being a valid message to consume or
something? Or am I misunderstanding what's going on and need to do
something different?

Thanks!


Horizontally Scaling Kafka Consumers

2015-04-29 Thread Nimi Wariboko Jr
Hi,

I was wondering what options there are/what other people are doing for
horizontally scaling kafka consumers? Basically if I have 100 partitions
and 10 consumers, and want to temporarily scale up to 50 consumers, what
can I do?

So far I've thought of just simply tracking consumer membership somehow
(either through zookeeper's ephemeral nodes or maybe using gossip) on the
consumers to achieve consensus on who consumes what. Another option would
be having a router, possibly using something like nsq (I understand that
they are similar pieces of software, but what we are going for is a
persistent distributed queue (sharding) which is why I'm looking into
Kafka)?


Re: Kafka 0.8.2 consumer offset checker throwing kafka.common.NotCoordinatorForConsumerException

2015-04-29 Thread Kartheek Karra
Update here, we resolved this by deleting the kafka-data directory on that
host ( which had file inconsistencies from 'fsck' run log last week in the
kafka-data dir ) and restarting kafka. Note we also never reimaged the host
( that was another host, which we got confused over ).

Thanks,
Kartheek

On Tue, Apr 28, 2015 at 6:36 PM, Kartheek Karra kka...@salesforce.com
wrote:

 We recently upgraded kafka in our production environment cluster of 5
 brokers from 0.8.0 to 0.8.2. Since then the consumerOffsetChecker script is
 unable to fetch offset due to
 kafka.common.NotCoordinatorForConsumerException.
 Note I'm able to run the 'consumerOffsetChecker' from an older version
 0.8.0 successfully without any exceptions against the same upgraded
 cluster. Also we haven't migrated to kafka for offsets and are still using
 the default zookeeper. Another piece of information here is that kafka is
 always picking the same host as coordinator for fetching offset and we had
 to reimage that host just after the upgrade. Haven't been able to reproduce
 this in our test environments yet.
 Any clue what might be wrong here ? Let me know if more details are needed
 anywhere.


 Thanks,
 Kartheek



Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
Whenever I restart a consumer (high level consumer api) it is not consuming
messages whichever were posted when the consumer was down.

I am using the following consumer properties

Properties props = new Properties();

props.put(zookeeper.connect, zooKeeper);

props.put(group.id, consumerName);

props.put(zookeeper.session.timeout.ms, 6000);

props.put(zookeeper.sync.time.ms, 200);

props.put(auto.commit.enable, false);

props.put(offsets.storage, kafka);

props.put(dual.commit.enabled, false);

props.put(auto.offset.reset, largest);


My offset manager is here
https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why the
consumer is behaving weird. Please share any updates if you have.



Thanks  Regards,


Re: Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Jiangjie Qin
OK, so you turned off auto.offset.commit, and set the auto.offset.reset to
largest.

That means when you consume,
1. If you did not commit offsets manually, no offsets will be committed to
Kafka.
2. If you do not have an offset stored in Kafka, you will start from the
log end and ignore the existing messages in the topic.

Another thing you want to check is that are you using the group Id all the
time?

Jiangjie (Becket) Qin

On 4/29/15, 3:17 PM, Gomathivinayagam Muthuvinayagam
sankarm...@gmail.com wrote:

I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
Whenever I restart a consumer (high level consumer api) it is not
consuming
messages whichever were posted when the consumer was down.

I am using the following consumer properties

Properties props = new Properties();

props.put(zookeeper.connect, zooKeeper);

props.put(group.id, consumerName);

props.put(zookeeper.session.timeout.ms, 6000);

props.put(zookeeper.sync.time.ms, 200);

props.put(auto.commit.enable, false);

props.put(offsets.storage, kafka);

props.put(dual.commit.enabled, false);

props.put(auto.offset.reset, largest);


My offset manager is here
https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why
the
consumer is behaving weird. Please share any updates if you have.



Thanks  Regards,



MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

 Properties props = new Properties();

props.put(metadata.broker.list, args[0]);

props.put(zk.connect, args[1]);

props.put(serializer.class, kafka.serializer.StringEncoder);

props.put(request.required.acks, 1);

String TOPIC = args[2];

ProducerConfig config = new ProducerConfig(props);

ProducerString, String producer = new ProducerString, String(
config);

finalEvent = new Timestamp(new Date().getTime()) + |

+ truckIds[0] + | + driverIds[0] + | + events[random
.nextInt(evtCnt)]

+ | + getLatLong(arrayroute17[i]);

try {

KeyedMessageString, String data = new
KeyedMessageString, String(TOPIC, finalEvent);

LOG.info(Sending Messge #:  + routeName[0] + :  + i +,
msg: + finalEvent);

producer.send(data);

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

   m_threadNumber = a_threadNumber;

   m_stream = a_stream;

   }

   public void run() {

   ConsumerIteratorbyte[], byte[] it = m_stream.iterator();

   while (it.hasNext()){

   System.out.println(Thread  + m_threadNumber + :  + new
String(it.next().message()));

   try {

 Thread.sleep(1000);

}catch (InterruptedException e) {

 e.printStackTrace();

 }

   }

   System.out.println(Shutting down Thread:  + m_threadNumber);

   }

}

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private  ExecutorService executor;



public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}



public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println(Timed out waiting for consumer threads
to shut down, exiting uncleanly);

}

} catch (InterruptedException e) {

System.out.println(Interrupted during shutdown, exiting
uncleanly);

}

   }



public void run(int a_numThreads) {

MapString, Integer topicCountMap = new HashMapString, Integer();

topicCountMap.put(topic, new Integer(a_numThreads));

MapString, ListKafkaStreambyte[], byte[] consumerMap =
consumer.createMessageStreams(topicCountMap);

ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(a_numThreads);

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber++;

}

}



private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

Properties props = new Properties();

props.put(zookeeper.connect, a_zookeeper);

props.put(group.id, a_groupId);

props.put(zookeeper.session.timeout.ms, 400);

props.put(zookeeper.sync.time.ms, 200);

props.put(auto.commit.interval.ms, 1000);

props.put(consumer.timeout.ms, -1);

 return new ConsumerConfig(props);

}



public static void main(String[] args) {

String zooKeeper = args[0];


Re: Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
Thank you, I am using the same groupId all the time.

I printed OffsetsMessageFormatter output for the consumer, and the output
is showing as

[async_force_consumers,force_msgs,9]::OffsetAndMetadata[2,associated
metadata,1430277791077]

But If I restart the consumer, it starts consuming messages from offset 1
for partition 9. Even though I have stored the offset as 2. I am not sure
what I am missing here.



Thanks  Regards,



On Wed, Apr 29, 2015 at 5:17 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 OK, so you turned off auto.offset.commit, and set the auto.offset.reset to
 largest.

 That means when you consume,
 1. If you did not commit offsets manually, no offsets will be committed to
 Kafka.
 2. If you do not have an offset stored in Kafka, you will start from the
 log end and ignore the existing messages in the topic.

 Another thing you want to check is that are you using the group Id all the
 time?

 Jiangjie (Becket) Qin

 On 4/29/15, 3:17 PM, Gomathivinayagam Muthuvinayagam
 sankarm...@gmail.com wrote:

 I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
 Whenever I restart a consumer (high level consumer api) it is not
 consuming
 messages whichever were posted when the consumer was down.
 
 I am using the following consumer properties
 
 Properties props = new Properties();
 
 props.put(zookeeper.connect, zooKeeper);
 
 props.put(group.id, consumerName);
 
 props.put(zookeeper.session.timeout.ms, 6000);
 
 props.put(zookeeper.sync.time.ms, 200);
 
 props.put(auto.commit.enable, false);
 
 props.put(offsets.storage, kafka);
 
 props.put(dual.commit.enabled, false);
 
 props.put(auto.offset.reset, largest);
 
 
 My offset manager is here
 https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why
 the
 consumer is behaving weird. Please share any updates if you have.
 
 
 
 Thanks  Regards,




Re: New Producer API - batched sync mode support

2015-04-29 Thread Gwen Shapira
I'm starting to think that the old adage If two people say you are drunk,
lie down applies here :)

Current API seems perfectly clear, useful and logical to everyone who wrote
it... but we are getting multiple users asking for the old batch behavior
back.
One reason to get it back is to make upgrades easier - people won't need to
rethink their existing logic if they get an API with the same behavior in
the new producer. The other reason is what Ewen mentioned earlier - if
everyone re-implements Joel's logic, we can provide something for that.

How about getting the old batch send behavior back by adding a new API with:
public void batchSend(ListProducerRecordK,V)

With this implementation (mixes the old behavior with Joel's snippet):
* send records one by one
* flush
* iterate on futures and get them
* log a detailed message on each error
* throw an exception if any send failed.

It reproduces the old behavior - which apparently everyone really liked,
and I don't think it is overly weird. It is very limited, but anyone who
needs more control over his sends already have plenty of options.

Thoughts?

Gwen




On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey guys,

 The locking argument is correct for very small records ( 50 bytes),
 batching will help here because for small records locking becomes the big
 bottleneck. I think these use cases are rare but not unreasonable.

 Overall I'd emphasize that the new producer is way faster at virtually all
 use cases. If there is a use case where that isn't true, let's look at it
 in a data driven way by comparing the old producer to the new producer and
 looking for any areas where things got worse.

 I suspect the reducing allocations argument to be not a big thing. We do
 a number of small per-message allocations and it didn't seem to have much
 impact. I do think there are a couple of big producer memory optimizations
 we could do by reusing the arrays in the accumulator in the serialization
 of the request but I don't think this is one of them.

 I'd be skeptical of any api that was too weird--i.e. introduces a new way
 of partitioning, gives back errors on a per-partition rather than per
 message basis (given that partitioning is transparent this is really hard
 to think about), etc. Bad apis end up causing a ton of churn and just don't
 end up being a good long term commitment as we change how the underlying
 code works over time (i.e. we hyper optimize for something then have to
 maintain some super weird api as it becomes hyper unoptimized for the
 client over time).

 Roshan--Flush works as you would hope, it blocks on the completion of all
 outstanding requests. Calling get on the future for the request gives you
 the associated error code back. Flush doesn't throw any exceptions because
 waiting for requests to complete doesn't error, the individual requests
 fail or succeed which is always reported with each request.

 Ivan--The batches you send in the scala producer today actually aren't
 truely atomic, they just get sent in a single request.

 One tricky problem to solve when user's do batching is size limits on
 requests. This can be very hard to manage since predicting the serialized
 size of a bunch of java objects is not always obvious. This was repeatedly
 a problem before.

 -Jay

 On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov ibalas...@gmail.com
 wrote:

  I must agree with @Roshan – it's hard to imagine anything more intuitive
  and easy to use for atomic batching as old sync batch api. Also, it's
 fast.
  Coupled with a separate instance of producer per
  broker:port:topic:partition it works very well. I would be glad if it
 finds
  its way into new producer api.
 
  On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
  fetchSize must be set at least as batch bytes (before or after
  compression), otherwise client risks not getting any messages?
 



Re: Unclaimed partitions

2015-04-29 Thread Dave Hamilton
Hi, would anyone be able to help me with this issue? Thanks.

- Dave



On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton 
dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote:

1. We’re using version 0.8.1.1.
2. No failures in the consumer logs
3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to 
the consumer group and what their offsets are. 8 of the 12 process each have 
been assigned two partitions and they’re keeping up with the topic. The other 4 
do not get assigned partitions and no consumers in the group are consuming 
those 8 partitions.

Thanks for your help,
Dave



On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote:

Couple of questions:
- What version of the consumer API are you using?
- Are you seeing any rebalance failures in the consumer logs?
- How do you determine that some partitions are unassigned? Just confirming 
that you have partitions that are not being consumed from as opposed to 
consumer threads that aren't assigned any partitions.

Aditya


From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Tuesday, April 28, 2015 10:19 AM
To: users@kafka.apache.org
Subject: Re: Unclaimed partitions

I’m sorry, I forgot to specify that these processes are in the same consumer 
group.

Thanks,
Dave





On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote:

Hi Dave,

The simple consumer doesn't do any state management across consumer 
instances. So I'm not sure how you are assigning partitions in your 
application code. Did you mean to say that you are using the high level 
consumer API?

Thanks,
Aditya


From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Tuesday, April 28, 2015 7:58 AM
To: users@kafka.apache.org
Subject: Unclaimed partitions

Hi, I am trying to consume a 24-partition topic across 12 processes. Each 
process is using the simple consumer API, and each is being assigned two 
consumer threads. I have noticed when starting these processes that sometimes 
some of my processes are not being assigned any partitions, and no rebalance 
seems to ever be triggered, leaving some of the partitions unclaimed.

When I first tried deploying this yesterday, I noticed 8 of the 24 
partitions, for 4 of the consumer processes, went unclaimed. Redeploying 
shortly later corrected this issue. I tried deploying again today, and now I 
see a different set of 4 processes not getting assigned partitions. The 
processes otherwise appear to be running normally, they are currently running 
in production and we are working to get the consumers quietly running before 
enabling them to do any work. I’m not sure if we might be looking at some 
sort of timing issue.

Does anyone know what might be causing the issues we’re observing?

Thanks,
Dave


Re: Kafka 0.8.2 beta - release

2015-04-29 Thread Ewen Cheslack-Postava
It has already been released, including a minor revision to fix some
critical bugs. The latest release is 0.8.2.1. The downloads page has links
and release notes: http://kafka.apache.org/downloads.html

On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam 
sankarm...@gmail.com wrote:

 I see lot of interesting features with Kafka 0.8.2 beta. I am just
 wondering when that will be released. Is there any timeline for that?

 Thanks  Regards,




-- 
Thanks,
Ewen


Re: Horizontally Scaling Kafka Consumers

2015-04-29 Thread Joe Stein
You can do this with the existing Kafka Consumer
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
and probably any other Kafka client too (maybe with minor/major rework
to-do the offset management).

The new consumer approach is more transparent on Subscribing To Specific
Partitions
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
.

Here is a Docker file (** pull request pending **) for wrapping kafka
consumers (doesn't have to be the go client, need to abstract that out some
more after more testing)
https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile


Also a VM (** pull request pending **) to build container, push to local
docker repository and launch on Apache Mesos
https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
as working example how-to-do.

All of this could be done without the Docker container and still work on
Mesos ... or even without Mesos and on YARN.

You might also want to checkout how Samza integrates with Execution
Frameworks
http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375 and
built in YARN support.

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com wrote:

 You're right Stevo, I should re-phrase to say that there can be no more
 _active_ consumers than there are partitions (within a single consumer
 group).
 I'm guessing that's what Nimi is alluding to asking, but perhaps he can
 elaborate on whether he's using consumer groups and/or whether the 100
 partitions are all for a single topic, or multiple topics.

 On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote:

  Please correct me if wrong, but I think it is really not hard constraint
  that one cannot have more consumers (from same group) than partitions on
  single topic - all the surplus consumers will not be assigned to consume
  any partition, but they can be there and as soon as one active consumer
  from same group goes offline (its connection to ZK is dropped), consumers
  from the group will be rebalanced so one passively waiting consumer will
  become active.
 
  Kind regards,
  Stevo Slavic.
 
  On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com
  wrote:
 
   If the 100 partitions are all for the same topic, you can have up to
 100
   consumers working as part of a single consumer group for that topic.
   You cannot have more consumers than there are partitions within a given
   consumer group.
  
   On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com
  wrote:
  
Hi,
   
I was wondering what options there are for horizontally scaling kafka
consumers? Basically if I have 100 partitions and 10 consumers, and
  want
   to
temporarily scale up to 50 consumers, what options do I have?
   
So far I've thought of just simply tracking consumer membership
 somehow
(either through Raft or zookeeper's znodes) on the consumers.
   
  
 



Kafka 0.8.2 beta - release

2015-04-29 Thread Gomathivinayagam Muthuvinayagam
I see lot of interesting features with Kafka 0.8.2 beta. I am just
wondering when that will be released. Is there any timeline for that?

Thanks  Regards,


RE: Unclaimed partitions

2015-04-29 Thread Aditya Auradkar
Hey Dave,

It's hard to say why this is happening without more information. Even if there 
are no errors in the log, is there anything to indicate that the rebalance 
process on those hosts even started? Does this happen occasionally or every 
time you start the consumer group? Can you paste the output of 
ConsumerOffsetChecker and describe topic?

Thanks,
Aditya

From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Wednesday, April 29, 2015 6:46 PM
To: users@kafka.apache.org; users@kafka.apache.org
Subject: Re: Unclaimed partitions

Hi, would anyone be able to help me with this issue? Thanks.

- Dave



On Tue, Apr 28, 2015 at 1:32 PM -0700, Dave Hamilton 
dhamil...@nanigans.commailto:dhamil...@nanigans.com wrote:

1. We’re using version 0.8.1.1.
2. No failures in the consumer logs
3. We’re using the ConsumerOffsetChecker to see what partitions are assigned to 
the consumer group and what their offsets are. 8 of the 12 process each have 
been assigned two partitions and they’re keeping up with the topic. The other 4 
do not get assigned partitions and no consumers in the group are consuming 
those 8 partitions.

Thanks for your help,
Dave



On 4/28/15, 1:40 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote:

Couple of questions:
- What version of the consumer API are you using?
- Are you seeing any rebalance failures in the consumer logs?
- How do you determine that some partitions are unassigned? Just confirming 
that you have partitions that are not being consumed from as opposed to 
consumer threads that aren't assigned any partitions.

Aditya


From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Tuesday, April 28, 2015 10:19 AM
To: users@kafka.apache.org
Subject: Re: Unclaimed partitions

I’m sorry, I forgot to specify that these processes are in the same consumer 
group.

Thanks,
Dave





On 4/28/15, 1:15 PM, Aditya Auradkar aaurad...@linkedin.com.INVALID wrote:

Hi Dave,

The simple consumer doesn't do any state management across consumer 
instances. So I'm not sure how you are assigning partitions in your 
application code. Did you mean to say that you are using the high level 
consumer API?

Thanks,
Aditya


From: Dave Hamilton [dhamil...@nanigans.com]
Sent: Tuesday, April 28, 2015 7:58 AM
To: users@kafka.apache.org
Subject: Unclaimed partitions

Hi, I am trying to consume a 24-partition topic across 12 processes. Each 
process is using the simple consumer API, and each is being assigned two 
consumer threads. I have noticed when starting these processes that sometimes 
some of my processes are not being assigned any partitions, and no rebalance 
seems to ever be triggered, leaving some of the partitions unclaimed.

When I first tried deploying this yesterday, I noticed 8 of the 24 
partitions, for 4 of the consumer processes, went unclaimed. Redeploying 
shortly later corrected this issue. I tried deploying again today, and now I 
see a different set of 4 processes not getting assigned partitions. The 
processes otherwise appear to be running normally, they are currently running 
in production and we are working to get the consumers quietly running before 
enabling them to do any work. I’m not sure if we might be looking at some 
sort of timing issue.

Does anyone know what might be causing the issues we’re observing?

Thanks,
Dave


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote:

 Commenting out Example shutdown did not seem to make a difference, I added
 the print statement below to highlight the fact.

 The other threads still shut down, and only one thread lives on, eventually
 that dies after a few minutes as well

 Could this be that the producer default partitioner is isn't balancing data
 across all partitions?

 Thanks,
 Chris

 Thread 0: 2015-04-29
 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

 Last Shutdown via example.shutDown called!

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
 ZKConsumerConnector shutting down

 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
 scheduler

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping leader finder thread

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping all fetchers

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-consumergroup], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-] All connections stopped

 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
 thread.

 Shutting down Thread: 2

 Shutting down Thread: 1

 Shutting down Thread: 3

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], ZKConsumerConnector shut down completed

 Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
 distance|-73.99021500035|40.6636611

 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], stopping watcher executor thread for consumer
 consumergroup

 Thread 0: 2015-04-29
 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:

  example.shutdown(); in ConsumerGroupExample closes all consumer
 connections
  to Kafka. remove this line the consumer threads will run forever
 
  On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
  wrote:
 
   Hi All,
  
   I am trying to get a multi threaded HL consumer working against a 2
  broker
   Kafka cluster with a 4 partition 2 replica  topic.
  
   The consumer code is set to run with 4 threads, one for each partition.
  
   The producer code uses the default partitioner and loops indefinitely
   feeding events into the topic.(I excluded the while loop in the paste
   below)
  
   What I see is the threads eventually all exit, even thought the
 producer
  is
   still sending events into the topic.
  
   My understanding is that the consumer thread per partition is the
 correct
   setup.
  
   Any ideas why this code doesn't continue to consume events at they are
   pushed to the topic?
  
   I suspect I am configuring something wrong here, but am not sure what.
  
   Thanks,
  
   Chris
  
  
   *T**opic Configuration*
  
   Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
  
   Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   *Producer Code:*
  
Properties props = new Properties();
  
   props.put(metadata.broker.list, args[0]);
  
   props.put(zk.connect, args[1]);
  
   props.put(serializer.class,
 kafka.serializer.StringEncoder);
  
   props.put(request.required.acks, 1);
  
   String TOPIC = args[2];
  
   ProducerConfig config = new ProducerConfig(props);
  
   ProducerString, String producer = new ProducerString,
 String(
   config);
  
   finalEvent = new Timestamp(new Date().getTime()) + |
  
   + truckIds[0] + | + driverIds[0] + | +
   events[random
   .nextInt(evtCnt)]
  
   + 

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote:

 Hi All,

 I am trying to get a multi threaded HL consumer working against a 2 broker
 Kafka cluster with a 4 partition 2 replica  topic.

 The consumer code is set to run with 4 threads, one for each partition.

 The producer code uses the default partitioner and loops indefinitely
 feeding events into the topic.(I excluded the while loop in the paste
 below)

 What I see is the threads eventually all exit, even thought the producer is
 still sending events into the topic.

 My understanding is that the consumer thread per partition is the correct
 setup.

 Any ideas why this code doesn't continue to consume events at they are
 pushed to the topic?

 I suspect I am configuring something wrong here, but am not sure what.

 Thanks,

 Chris


 *T**opic Configuration*

 Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

 Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

 *Producer Code:*

  Properties props = new Properties();

 props.put(metadata.broker.list, args[0]);

 props.put(zk.connect, args[1]);

 props.put(serializer.class, kafka.serializer.StringEncoder);

 props.put(request.required.acks, 1);

 String TOPIC = args[2];

 ProducerConfig config = new ProducerConfig(props);

 ProducerString, String producer = new ProducerString, String(
 config);

 finalEvent = new Timestamp(new Date().getTime()) + |

 + truckIds[0] + | + driverIds[0] + | +
 events[random
 .nextInt(evtCnt)]

 + | + getLatLong(arrayroute17[i]);

 try {

 KeyedMessageString, String data = new
 KeyedMessageString, String(TOPIC, finalEvent);

 LOG.info(Sending Messge #:  + routeName[0] + :  + i +,
 msg: + finalEvent);

 producer.send(data);

 Thread.sleep(1000);

 } catch (Exception e) {

 e.printStackTrace();

 }


 *Consumer Code:*

 public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run() {

ConsumerIteratorbyte[], byte[] it = m_stream.iterator();

while (it.hasNext()){

System.out.println(Thread  + m_threadNumber + :  + new
 String(it.next().message()));

try {

  Thread.sleep(1000);

 }catch (InterruptedException e) {

  e.printStackTrace();

  }

}

System.out.println(Shutting down Thread:  + m_threadNumber);

}

 }

 public class ConsumerGroupExample {

 private final ConsumerConnector consumer;

 private final String topic;

 private  ExecutorService executor;



 public ConsumerGroupExample(String a_zookeeper, String a_groupId,
 String a_topic) {

 consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

 createConsumerConfig(a_zookeeper, a_groupId));

 this.topic = a_topic;

 }



 public void shutdown() {

 if (consumer != null) consumer.shutdown();

 if (executor != null) executor.shutdown();

 try {

 if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

 System.out.println(Timed out waiting for consumer threads
 to shut down, exiting uncleanly);

 }

 } catch (InterruptedException e) {

 System.out.println(Interrupted during shutdown, exiting
 uncleanly);

 }

}



 public void run(int a_numThreads) {

 MapString, Integer topicCountMap = new HashMapString,
 Integer();

 topicCountMap.put(topic, new Integer(a_numThreads));

 MapString, ListKafkaStreambyte[], byte[] consumerMap =
 consumer.createMessageStreams(topicCountMap);

 ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic);

 executor = Executors.newFixedThreadPool(a_numThreads);

 int threadNumber = 0;

 for (final KafkaStream stream : streams) {

 executor.submit(new ConsumerTest(stream, threadNumber));

 threadNumber++;

 }

 }



 private static ConsumerConfig createConsumerConfig(String a_zookeeper,
 String a_groupId) {

 Properties props = new Properties();

 props.put(zookeeper.connect, a_zookeeper);

 props.put(group.id, a_groupId);