Yes you are correct. I mis-understood the feature.
auto.create.topics.enable has an interesting side effect. Say your kafka
configuration file allows for 10 partitions of a topic. If the topic
auto-creates you get 10 partitions. Doing it the way I did it gives you
control of the number of partitions created regardless of how kafka is
configured.


On Thu, Nov 7, 2013 at 4:16 AM, Chris Bedford <ch...@buildlackey.com> wrote:

> auto.create.topics.enable
> is true by default.   For this test  I relied on that property.    I don't
> think a real production class should rely on that though.. Too easy to mess
> things up with a typo    - cb
>
>
> On Wed, Nov 6, 2013 at 9:28 PM, Edward Capriolo <edlinuxg...@gmail.com
> >wrote:
>
> > One thing I noticed about your code. I thought in kafka 0.8.0 topics are
> > not created automatically on first message. I do not see anywhere in your
> > code which creates the topics.
> >
> > I am creating the topic as part of my tests.
> >
> > Before I was not setting that property, and getting some of the messages.
> > Which is really weird. You are using a slightly different kafka then me
> > (your pom.xml vs mine) as well.
> >
> > Here is what I have now:
> >
> >
> >
> https://github.com/edwardcapriolo/IronCount/blob/iron-ng/src/test/java/com/jointhegrid/ironcount/IronIntegrationTest.java
> >
> >
> https://github.com/edwardcapriolo/IronCount/blob/iron-ng/src/test/java/com/jointhegrid/ironcount/IntegrationTest.java
> >
> > The property I mentioned is making all my tests happy, so that was the
> > magic bullet for me. Everything else I did in the code above clean up
> wise
> > had no effect. I did all the cleanups above in the code and nothing was
> > working till I switched on that param.
> >
> >
> > On Thu, Nov 7, 2013 at 12:15 AM, Chris Bedford <ch...@buildlackey.com
> > >wrote:
> >
> > > Do you need to use that configuration to get the tests (as currently
> > > checked in) to pass ?    I did not find i needed that particular knob
> > >  (although it is a good one to know about).
> > >
> > > and Sorry about your suffering... I can sympathize !
> > >
> > >  - cb
> > >
> > >
> > > On Wed, Nov 6, 2013 at 7:38 PM, Edward Capriolo <edlinuxg...@gmail.com
> > > >wrote:
> > >
> > > > After about 5 days of relentless head pounding, and twittling about
> > > > everything under the sun, I figured it out.
> > > >
> > > > If you read to the bottom of this page:
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> > > >
> > > > You find:
> > > >
> > > > consumerProps.put("auto.offset.reset", "smallest");
> > > >
> > > > Now I can bring up the entire stack in jvm and test like I used to. !
> > > >
> > > >
> > > > On Wed, Nov 6, 2013 at 2:52 AM, Chris Bedford <ch...@buildlackey.com
> >
> > > > wrote:
> > > >
> > > > > Hi, Edward..
> > > > >
> > > > > yup .. you are correct.. when we get to a little over 1000 messages
> > the
> > > > > program was  failing with the  exception stack trace i included
> > below.
> > > > >
> > > > > I fixed the test so it passes as long as the consumer gets all
> > messages
> > > > > sent by the producer.. even if an exception is thrown during shut
> > > down..
> > > > >
> > > > > This isn't as clean as i'd like it to be.  I tried
> > > > > calling kafkaServer.awaitShutdown();
> > > > > And I tried inserting some Thread.sleep() calls to give the
> consumer
> > > and
> > > > > producer shut down procedures some time to complete. But I still
> got
> > > the
> > > > > stack trace shown below.
> > > > >
> > > > > I don't have time to chase the bug any further.. But I did correct
> > the
> > > > > test, so you can pull it and see that it passes if you want.
> > > > >
> > > > > Maybe we should file a bug on this...?        It might be that I'm
> > > using
> > > > > the API incorrectly. I'm not sure at this point.
> > > > >
> > > > > anyway, thx for informing me of the issue.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *Failure due to broken shut down>> >*
> > > > >
> > > > > java.io.IOException: Connection reset by peer
> > > > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:225)
> > > > > at sun.nio.ch.IOUtil.read(IOUtil.java:198)
> > > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at kafka.network.Processor.read(SocketServer.scala:339)
> > > > > at kafka.network.Processor.run(SocketServer.scala:245)
> > > > > at java.lang.Thread.run(Thread.java:724)
> > > > > INFO 2013-11-05 23:45:53,944
> > > > >
> > >
>  ConsumerFetcherThread-RANDOM-GROUP-ID_ubuntu-1383723948022-27b6aba8-0-1
> > > > > kafka.consumer.SimpleConsumer Reconnect due to socket error:
> > > > > java.nio.channels.ClosedByInterruptException
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> > > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
> > > > > at
> > > >
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
> > > > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> > > > > at
> > > > >
> > > >
> > >
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> > > > > at kafka.utils.Utils$.read(Utils.scala:394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109)
> > > > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > > at
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > INFO
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Nov 4, 2013 at 8:14 PM, Edward Capriolo <
> > edlinuxg...@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > I have success when the number of messages is less then ~1200.
> With
> > > > more
> > > > > > then 1200 it never completes.
> > > > > >
> > > > > > Try changing the program to this:
> > > > > >
> > > > > >       tkp = new TestKafkaProducer(
> > > > > >                     theTopic,
> > > > > >                     "localhost:" + zookeeperTestServer.getPort(),
> > > > > >                     4000);
> > > > > >
> > > > > > ERROR 23:13:35,900 Thread Thread[main,5,main] died
> > > > > > java.lang.RuntimeException: oh rats... we failed
> > > > > >     at TestKafkaProducer.main(TestKafkaProducer.java:71)
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 4, 2013 at 11:08 PM, Chris Bedford <
> > > ch...@buildlackey.com
> > > > > > >wrote:
> > > > > >
> > > > > > > Hi Ed:
> > > > > > >
> > > > > > > regarding the test the testtopology exampe program on my github
> > ...
> > > > > > > do you see the words "  successful completion"   printed out
> > > towards
> > > > > the
> > > > > > > end of the test Run?
> > > > > > >
> > > > > > > I assuming that you ran mvn:exec java ...etc..  as specified in
> > the
> > > > > Read
> > > > > > Me
> > > > > > > file.. is that correct?
> > > > > > > On Nov 4, 2013 7:52 PM, "Edward Capriolo" <
> edlinuxg...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > I am using:
> > > > > > > >    <dependency>
> > > > > > > >                     <groupId>org.apache.kafka</groupId>
> > > > > > > >             <artifactId>kafka_2.9.2</artifactId>
> > > > > > > >             <version>0.8.0-beta1</version>
> > > > > > > >         </dependency>
> > > > > > > >
> > > > > > > > I am trying to rebuild iron-count
> > > > > > > > https://github.com/edwardcapriolo/IronCount/tree/iron-ng
> > > > > > > > against kafka 0.8.0
> > > > > > > >
> > > > > > > > I am having considerable issues getting tests to run
> correctly.
> > > > > > > >
> > > > > > > > If you run this example
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/buildlackey/cep/blob/master/kafka-0.8.x/src/main/java/TestKafkaProducer.java
> > > > > > > >
> > > > > > > > I am writing 2000 messages into a producer..the consumer
> seems
> > to
> > > > > read
> > > > > > > 1200
> > > > > > > > messages or so ...then it never gets any of the rest.
> > > > > > > >
> > > > > > > > So am I just messing something up or is this release still
> > having
> > > > the
> > > > > > > kinks
> > > > > > > > worked out?
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Chris Bedford
> > > > >
> > > > > Founder & Lead Lackey
> > > > > Build Lackey Labs:  http://buildlackey.com
> > > > > Go Grails!: http://blog.buildlackey.com
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Chris Bedford
> > >
> > > Founder & Lead Lackey
> > > Build Lackey Labs:  http://buildlackey.com
> > > Go Grails!: http://blog.buildlackey.com
> > >
> >
>
>
>
> --
> Chris Bedford
>
> Founder & Lead Lackey
> Build Lackey Labs:  http://buildlackey.com
> Go Grails!: http://blog.buildlackey.com
>

Reply via email to