Hi Neha,
I have two problems:. Any help is greatly appreciated.
1)* java.lang.IllegalStateException: Iterator is in failed state*
ConsumerConnector consumerConnector = Consumer
.createJavaConsumerConnector(getConsumerConfig());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, *32*);
Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap =
consumerConnector
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams =
Collections.synchronizedList(topicStreamMap.get(topic));
AppStaticInfo info = Mupd8Main.STATICINFO();
Iterator<KafkaStream<byte[], byte[]>> iterator =
streams.iterator();
// remove the head first list for this source...rest are for the
Dynamic Souce...
mainIterator = iterator.next().iterator();
List<ConsumerIterator<byte[], byte[]>> iteratorList = new
ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
// now rest of the iterator must be registered now..
while(iterator.hasNext()){
iteratorList.add(iterator.next().iterator());
}
*KafkaStreamRegistory.registerStream(mainSourceName, iteratorList);*
Once the Consumer iterator is created and registered. We use this in
another thread to start reading from the Consumer Iterator. Sometime it
give following exception.
24 Oct 2014 16:03:25,923 ERROR
[SourceReader:request_source:LogStreamKafkaSource1]
(grizzled.slf4j.Logger.error:116) - SourceThread: exception during reads.
Swallowed to continue next read.
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
I have tried to recover from this state by using this:
iterator.resetState(); but it does not recover sometime.
*2) ConsumerFetcherThread are blocked on enqueue ? What controls size of
queue ? Why are they blocked ? *Due to this our lags are increasing. our
threads blocked on hasNext()...
"ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
[0x0000000116379000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000704019388> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
"ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
[0x0000000116276000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000704064ce0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Thanks,
Bhavesh
On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <[email protected]>
wrote:
> Can you provide the steps to reproduce this issue?
>
> On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> [email protected]>
> wrote:
>
> > I am using one from the Kafka Trunk branch.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <[email protected]>
> > wrote:
> >
> > > Which version of Kafka are you using on the consumer?
> > >
> > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > [email protected]>
> > > wrote:
> > >
> > > > HI Kafka Community ,
> > > >
> > > > I am using kafka trunk source code and I get following exception.
> What
> > > > could cause the iterator to have FAILED state. Please let me know
> how
> > I
> > > > can fix this issue.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *java.lang.IllegalStateException: Iterator is in failed state at
> > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > Here is Properties:
> > > >
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", zkConnect);
> > > > props.put("group.id", groupId);
> > > > * props.put("consumer.timeout.ms <http://consumer.timeout.ms
> >",
> > > > "-1");*
> > > > props.put("zookeeper.session.timeout.ms", "10000");
> > > > props.put("zookeeper.sync.time.ms", "6000");
> > > > props.put("auto.commit.interval.ms", "2000");
> > > > props.put("rebalance.max.retries", "8");
> > > > props.put("auto.offset.reset", "largest");
> > > > props.put("fetch.message.max.bytes","2097152");
> > > > props.put("socket.receive.buffer.bytes","2097152");
> > > > props.put("auto.commit.enable","true");
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > >
> >
>