I got a consumer using high-level API based on the example from here: 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+ExampleWorks 
fine but after an hour or so of inactivity it stops responding to new messages. 
All I see the log is:

INFO  [2016-02-23 17:06:10,070] org.apache.zookeeper.ClientCnxn: Client session 
timed out, have not heard from server in 4002ms for sessionid 
0x152b35436a9003d, closing socket connection and attempting reconnectINFO  
[2016-02-23 17:06:10,173] org.I0Itec.zkclient.ZkClient: zookeeper state changed 
(Disconnected)INFO  [2016-02-23 17:06:11,223] org.apache.zookeeper.ClientCnxn: 
Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt 
to authenticate using SASL (unknown error)INFO  [2016-02-23 17:06:11,225] 
org.apache.zookeeper.ClientCnxn: Socket connection established to 
localhost/127.0.0.1:2181, initiating sessionINFO  [2016-02-23 17:06:11,231] 
org.apache.zookeeper.ClientCnxn: Session establishment complete on server 
localhost/127.0.0.1:2181, sessionid = 0x152b35436a9003d, negotiated timeout = 
6000 INFO  [2016-02-23 17:06:11,231] org.I0Itec.zkclient.ZkClient: zookeeper 
state changed (SyncConnected)




Init code: 
 Properties props = new Properties(); 
 props.put("zookeeper.connect", "localhost:2181"); 
props.put("zookeeper.session.timeout.ms", "400"); 
props.put("zookeeper.sync.time.ms", "200");
 props.put("group.id", "ONEGROUP"); props.put("auto.commit.interval.ms", 
"1000"); props.put("fetch.message.max.bytes",   "200000000"); 
props.put("max.partition.fetch.bytes", "200000000");  ConsumerConfig cc = new 
ConsumerConfig(props);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(cc);
Map<String, Integer> topicCountMap = new HashMap<String, 
Integer>();topicCountMap.put("LISTENER1", new Integer(1));Map<String, 
List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);        ExecutorService 
threadExecutor = Executors.newFixedThreadPool(1);
 
 int threadNumber = 0;
 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("LISTENER1"); for 
(final KafkaStream stream : streams) {
      threadExecutor.submit(new ProcessorThread(stream, threadNumber));       
threadNumber++;  }

ProcessorThread:
class ProcessorThread implements Runnable {
     private final KafkaStream stream;    private final int threadNumber;
  public ProcessorThread(KafkaStream stream, int threadNumber) { this.stream = 
stream; this.threadNumber = threadNumber; }  public void run() {
 ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()) { 
 MessageAndMetadata<byte[], byte[]> msg = it.next(); byte[] bytes = 
msg.message(); String msgString = new String(bytes); /* process message *. }
 }

Reply via email to