Thanks Andy
From: Kamal C <kamaltar...@gmail.com> Reply-To: <users@kafka.apache.org> Date: Sunday, May 22, 2016 at 8:36 AM To: <users@kafka.apache.org> Subject: Re: newbie: kafka 0.9.0.0 producer does not terminate after producer.close() > Andy, > > Kafka 0.9.0 server supports the previous versions of the clients (0.8.2, > 0.8.1..). > But, new clients won't work properly with the older version of Kafka server. > > You should upgrade your server / broker first. > > --Kamal > > On Fri, May 20, 2016 at 10:58 PM, Andy Davidson < > a...@santacruzintegration.com> wrote: > >> Hi Jaikiran >> >> Bellow is the stack trace. For completeness I see in my log file that my >> code has called >> >> producer.flush(); >> >> producer.close(); >> >> >> >> I get the following error, how ever I do not think this is the problem. I >> found a ??bug report?? That said this was because I was connecting to a >> 0.8x >> sever. I am able to consume my test messages using >> kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh >> >> Kind regards >> >> Andy >> >> ERROR 17:12:14 kafka-producer-network-thread | producer-1 >> o.a.k.c.p.i.Sender >> run line:130 Uncaught error in kafka producer I/O thread: >> >> org.apache.kafka.common.protocol.types.SchemaException: Error reading field >> 'throttle_time_ms': java.nio.BufferUnderflowException >> >> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) >> >> at >> >> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient >> .java:464) >> >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> $ jstack 908 >> >> 2016-05-20 10:16:25 >> >> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode): >> >> >> >> "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x00007fe04291c800 >> nid=0x130b waiting on condition [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31 >> tid=0x00007fe041116800 nid=0x5a0f runnable [0x00007000015d5000] >> >> java.lang.Thread.State: RUNNABLE >> >> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method) >> >> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198) >> >> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103) >> >> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) >> >> - locked <0x000000076b67ea88> (a sun.nio.ch.Util$2) >> >> - locked <0x000000076b67ea00> (a java.util.Collections$UnmodifiableSet) >> >> - locked <0x000000076b67e740> (a sun.nio.ch.KQueueSelectorImpl) >> >> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >> >> at org.apache.kafka.common.network.Selector.select(Selector.java:425) >> >> at org.apache.kafka.common.network.Selector.poll(Selector.java:254) >> >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) >> >> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> >> "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fe042015800 >> nid=0x5203 runnable [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fe04285b000 >> nid=0x5003 waiting on condition [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fe04282f000 >> nid=0x4e03 waiting on condition [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fe041830800 >> nid=0x4c03 waiting on condition [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fe04201c800 >> nid=0x4a03 waiting on condition [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fe042015000 >> nid=0x3e0f runnable [0x0000000000000000] >> >> java.lang.Thread.State: RUNNABLE >> >> >> >> "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fe04200d800 nid=0x3803 >> in >> Object.wait() [0x0000700000d3a000] >> >> java.lang.Thread.State: WAITING (on object monitor) >> >> at java.lang.Object.wait(Native Method) >> >> - waiting on <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) >> >> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) >> >> - locked <0x000000076ab070b8> (a java.lang.ref.ReferenceQueue$Lock) >> >> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) >> >> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) >> >> >> >> "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fe04200d000 >> nid=0x3603 in Object.wait() [0x0000700000c37000] >> >> java.lang.Thread.State: WAITING (on object monitor) >> >> at java.lang.Object.wait(Native Method) >> >> - waiting on <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) >> >> at java.lang.Object.wait(Object.java:502) >> >> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) >> >> - locked <0x000000076ab06af8> (a java.lang.ref.Reference$Lock) >> >> >> >> "main" #1 prio=5 os_prio=31 tid=0x00007fe041010800 nid=0x1703 waiting on >> condition [0x0000700000219000] >> >> java.lang.Thread.State: WAITING (parking) >> >> at sun.misc.Unsafe.park(Native Method) >> >> - parking to wait for <0x000000076b89c8e0> (a >> java.util.concurrent.CountDownLatch$Sync) >> >> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) >> >> at >> >> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt( >> AbstractQueuedSynchronizer.java:836) >> >> at >> >> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterru >> ptibly(AbstractQueuedSynchronizer.java:997) >> >> at >> >> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterrupt >> ibly(AbstractQueuedSynchronizer.java:1304) >> >> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) >> >> at >> >> org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(Produ >> ceRequestResult.java:57) >> >> at >> >> org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushComp >> letion(RecordAccumulator.java:422) >> >> at >> >> org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:546 >> ) >> >> at com.pws.gnip.powertrack.HoseBirdClient.main(HoseBirdClient.java:56) >> >> >> >> "VM Thread" os_prio=31 tid=0x00007fe04200a000 nid=0x3403 runnable >> >> >> >> "GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fe042005800 >> nid=0x2403 >> runnable >> >> >> >> "GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fe042006000 >> nid=0x2603 >> runnable >> >> >> >> "GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fe042006800 >> nid=0x2803 >> runnable >> >> >> >> "GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fe042007000 >> nid=0x2a03 >> runnable >> >> >> >> "GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fe042800800 >> nid=0x2c03 >> runnable >> >> >> >> "GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fe042801000 >> nid=0x2e03 >> runnable >> >> >> >> "GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fe042801800 >> nid=0x3003 >> runnable >> >> >> >> "GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fe042802800 >> nid=0x3203 >> runnable >> >> >> >> "VM Periodic Task Thread" os_prio=31 tid=0x00007fe04202d800 nid=0x5403 >> waiting on condition >> >> >> >> JNI global references: 341 >> >> >> >> From: Jaikiran Pai <jai.forums2...@gmail.com> >> Reply-To: <users@kafka.apache.org> >> Date: Friday, May 20, 2016 at 7:55 AM >> To: <users@kafka.apache.org> >> Subject: Re: newbie: kafka 0.9.0.0 producer does not terminate after >> producer.close() >> >>> > You can take a thread dump (using "jstack <pid-of-your-program>") when >>> > the program doesn't terminate and post that output here. That will tell >>> > us which threads are causing the program to not terminate. >>> > >>> > -Jaikiran >>> > >>> > On Tuesday 17 May 2016 11:32 PM, Andy Davidson wrote: >>>> >> I wrote a little test client that reads from a file an publishes using >> the >>>> >> 0.9.0.0 API. I am contacting to an older 0.8.x sever. I am able to >>>> send >>>> >> messages how ever I noticed that once I am done reading the input file >> my >>>> >> test program hangs >>>> >> >>>> >> Any idea what I am doing wrong? >>>> >> >>>> >> Kind regards >>>> >> >>>> >> Andy >>>> >> >>>> >> >>>> >> public static void main(String[] args) throws IOException { >>>> >> logger.warn("BEGIN"); >>>> >> >>>> >> readFromFile(cmdLine, producer, topic); >>>> >> >>>> >> >>>> >> >>>> >> producer.flush(); >>>> >> >>>> >> producer.close(); >>>> >> >>>> >> >>>> >> >>>> >> logger.warn("END"); >>>> >> >>>> >> } >>>> >> >>>> >> >>>> >> private static void readFromFile(CmdLine cmdLine, >>>> KafkaProducer<String, >>>> >> String> producer, >>>> >> >>>> >> String topic) throws IOException { >>>> >> >>>> >> >>>> >> >>>> >> logger.info("BEGIN"); >>>> >> >>>> >> BufferedReader reader = cmdLine.getReader(); >>>> >> >>>> >> String value = null; >>>> >> >>>> >> >>>> >> >>>> >> while ((value = reader.readLine()) != null) { >>>> >> >>>> >> logger.info("sending value: " + value); >>>> >> >>>> >> publish(producer, topic, value); >>>> >> >>>> >> } >>>> >> >>>> >> logger.info("END"); >>>> >> >>>> >> } >>>> >> >>>> >> >>>> >> >>>> >> private static void publish(KafkaProducer<String, String> producer, >> String >>>> >> topic, String value) { >>>> >> >>>> >> Future<RecordMetadata> response = producer.send(new >> ProducerRecord<String, >>>> >> String>(topic, value)); >>>> >> >>>> >> >>>> >> >>>> >> /* TODO >>>> >> >>>> >> send() will raise following error. >>>> >> >>>> >> It is because we are using a 0.9.0.0 client with an 0.8 server. The >> 0.8 >>>> >> consumer seems >>>> >> >>>> >> to work with out problems >>>> >> >>>> >> } >>>> >> >>>> >> >>>> >> >>>> >> Š >>>> >> INFO 17:02:53 main c.p.g.p.KClient readFromFile line:79 BEGIN >>>> >> >>>> >> Š >>>> >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:85 sending >>>> value: >>>> >> dependencies { >>>> >> >>>> >> Š >>>> >> INFO 17:02:54 main c.p.g.p.KClient readFromFile line:89 END >>>> >> >>>> >> Š >>>> >> >>>> >> The following error appears to be because we are using 0.9.0.0 api >> with an >>>> >> 0.8.x sever. If I read from stdin instead of a file I would be able to >>>> >> continue sending messages. I do not think this is the reason my test >> code >>>> >> hangs. >>>> >> >>>> >> ERROR 17:02:54 kafka-producer-network-thread | producer-1 >> o.a.k.c.p.i.Sender >>>> >> run line:130 Uncaught error in kafka producer I/O thread: >>>> >> >>>> >> org.apache.kafka.common.protocol.types.SchemaException: Error reading >> field >>>> >> 'throttle_time_ms': java.nio.BufferUnderflowException >>>> >> >>>> >> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) >>>> >> >>>> >> at >>>> >> >> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient >>>> >> .java:464) >>>> >> >>>> >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) >>>> >> >>>> >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) >>>> >> >>>> >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) >>>> >> >>>> >> at java.lang.Thread.run(Thread.java:745) >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>> > >>> > >> >> >> >