>> Sometime it give following exception. It will help to have a more specific test case that reproduces the failed iterator state.
Also, the consumer threads block if the fetcher queue is full. The queue can fill up if your consumer thread dies or slows down. I'd recommend you ensure that all your consumer threads are alive. You can take a thread dump to verify this. Thanks, Neha On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Neha, > > > I have two problems:. Any help is greatly appreciated. > > > 1)* java.lang.IllegalStateException: Iterator is in failed state* > > ConsumerConnector consumerConnector = Consumer > .createJavaConsumerConnector(getConsumerConfig()); > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > topicCountMap.put(topic, *32*); > Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap = > consumerConnector > .createMessageStreams(topicCountMap); > > List<KafkaStream<byte[], byte[]>> streams = > Collections.synchronizedList(topicStreamMap.get(topic)); > > AppStaticInfo info = Mupd8Main.STATICINFO(); > > Iterator<KafkaStream<byte[], byte[]>> iterator = > streams.iterator(); > // remove the head first list for this source...rest are for the > Dynamic Souce... > mainIterator = iterator.next().iterator(); > > List<ConsumerIterator<byte[], byte[]>> iteratorList = new > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size()); > // now rest of the iterator must be registered now.. > while(iterator.hasNext()){ > iteratorList.add(iterator.next().iterator()); > } > *KafkaStreamRegistory.registerStream(mainSourceName, > iteratorList);* > > Once the Consumer iterator is created and registered. We use this in > another thread to start reading from the Consumer Iterator. Sometime it > give following exception. > > 24 Oct 2014 16:03:25,923 ERROR > [SourceReader:request_source:LogStreamKafkaSource1] > (grizzled.slf4j.Logger.error:116) - SourceThread: exception during reads. > Swallowed to continue next read. > java.lang.IllegalStateException: Iterator is in failed state > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) > > > I have tried to recover from this state by using this: > iterator.resetState(); but it does not recover sometime. > > > > > *2) ConsumerFetcherThread are blocked on enqueue ? What controls size of > queue ? Why are they blocked ? *Due to this our lags are increasing. our > threads blocked on hasNext()... > > > "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1" > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition > [0x0000000116379000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000704019388> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > at kafka.utils.Utils$.inLock(Utils.scala:535) > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2" > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition > [0x0000000116276000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0000000704064ce0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > at > > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112) > at kafka.utils.Utils$.inLock(Utils.scala:535) > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) > > > > > > > Thanks, > > Bhavesh > > > > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > > > Can you provide the steps to reproduce this issue? > > > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com> > > wrote: > > > > > I am using one from the Kafka Trunk branch. > > > > > > Thanks, > > > > > > Bhavesh > > > > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede < > neha.narkh...@gmail.com> > > > wrote: > > > > > > > Which version of Kafka are you using on the consumer? > > > > > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry < > > > > mistry.p.bhav...@gmail.com> > > > > wrote: > > > > > > > > > HI Kafka Community , > > > > > > > > > > I am using kafka trunk source code and I get following exception. > > What > > > > > could cause the iterator to have FAILED state. Please let me know > > how > > > I > > > > > can fix this issue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *java.lang.IllegalStateException: Iterator is in failed state at > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)* > > > > > Here is Properties: > > > > > > > > > > Properties props = new Properties(); > > > > > props.put("zookeeper.connect", zkConnect); > > > > > props.put("group.id", groupId); > > > > > * props.put("consumer.timeout.ms < > http://consumer.timeout.ms > > >", > > > > > "-1");* > > > > > props.put("zookeeper.session.timeout.ms", "10000"); > > > > > props.put("zookeeper.sync.time.ms", "6000"); > > > > > props.put("auto.commit.interval.ms", "2000"); > > > > > props.put("rebalance.max.retries", "8"); > > > > > props.put("auto.offset.reset", "largest"); > > > > > props.put("fetch.message.max.bytes","2097152"); > > > > > props.put("socket.receive.buffer.bytes","2097152"); > > > > > props.put("auto.commit.enable","true"); > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > >