Hi, Edward..

yup .. you are correct.. when we get to a little over 1000 messages the
program was  failing with the  exception stack trace i included below.

I fixed the test so it passes as long as the consumer gets all messages
sent by the producer.. even if an exception is thrown during shut down..

This isn't as clean as i'd like it to be.  I tried
calling kafkaServer.awaitShutdown();
And I tried inserting some Thread.sleep() calls to give the consumer and
producer shut down procedures some time to complete. But I still got the
stack trace shown below.

I don't have time to chase the bug any further.. But I did correct the
test, so you can pull it and see that it passes if you want.

Maybe we should file a bug on this...?        It might be that I'm using
the API incorrectly. I'm not sure at this point.

anyway, thx for informing me of the issue.




*Failure due to broken shut down>> >*

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:225)
at sun.nio.ch.IOUtil.read(IOUtil.java:198)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:339)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:724)
INFO 2013-11-05 23:45:53,944
 ConsumerFetcherThread-RANDOM-GROUP-ID_ubuntu-1383723948022-27b6aba8-0-1
kafka.consumer.SimpleConsumer Reconnect due to socket error:
java.nio.channels.ClosedByInterruptException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:394)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
INFO



On Mon, Nov 4, 2013 at 8:14 PM, Edward Capriolo <edlinuxg...@gmail.com>wrote:

> I have success when the number of messages is less then ~1200. With more
> then 1200 it never completes.
>
> Try changing the program to this:
>
>       tkp = new TestKafkaProducer(
>                     theTopic,
>                     "localhost:" + zookeeperTestServer.getPort(),
>                     4000);
>
> ERROR 23:13:35,900 Thread Thread[main,5,main] died
> java.lang.RuntimeException: oh rats... we failed
>     at TestKafkaProducer.main(TestKafkaProducer.java:71)
>
>
> On Mon, Nov 4, 2013 at 11:08 PM, Chris Bedford <ch...@buildlackey.com
> >wrote:
>
> > Hi Ed:
> >
> > regarding the test the testtopology exampe program on my github ...
> > do you see the words "  successful completion"   printed out towards the
> > end of the test Run?
> >
> > I assuming that you ran mvn:exec java ...etc..  as specified in the Read
> Me
> > file.. is that correct?
> > On Nov 4, 2013 7:52 PM, "Edward Capriolo" <edlinuxg...@gmail.com> wrote:
> >
> > > I am using:
> > >    <dependency>
> > >                     <groupId>org.apache.kafka</groupId>
> > >             <artifactId>kafka_2.9.2</artifactId>
> > >             <version>0.8.0-beta1</version>
> > >         </dependency>
> > >
> > > I am trying to rebuild iron-count
> > > https://github.com/edwardcapriolo/IronCount/tree/iron-ng
> > > against kafka 0.8.0
> > >
> > > I am having considerable issues getting tests to run correctly.
> > >
> > > If you run this example
> > >
> > >
> > >
> >
> https://github.com/buildlackey/cep/blob/master/kafka-0.8.x/src/main/java/TestKafkaProducer.java
> > >
> > > I am writing 2000 messages into a producer..the consumer seems to read
> > 1200
> > > messages or so ...then it never gets any of the rest.
> > >
> > > So am I just messing something up or is this release still having the
> > kinks
> > > worked out?
> > >
> >
>



-- 
Chris Bedford

Founder & Lead Lackey
Build Lackey Labs:  http://buildlackey.com
Go Grails!: http://blog.buildlackey.com

Reply via email to