Hi Jay, I’ve uploaded patch in KAFKA-2009 for this. It is a pretty small patch and it will be great if you can help review it.
Thanks. Jiangjie (Becket) Qin On 3/8/15, 12:31 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: >Hey guys, > >If we checked in obviously broken code on trunk, let's fix it now or >revert >that change. > >-Jay > >On Sat, Mar 7, 2015 at 12:48 AM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Hi Tao, >> >> Thanks a lot for finding the bug. We are actually rewriting the mirror >> maker in KAFKA-1997 with a much simplified solution using the newly >>added >> flush() call in new java producer. >> Mirror maker in current trunk is also missing one necessary >> synchronization - the UncheckedOffsets.removeOffset is not >>synchronized. I >> am hesitating whether to fix those problems in current trunk or just >> waiting for Kafka-1997 to be checked in. If you have a strong opinion >> about this, we can probably fix those 2 issues in the trunk. It should >>be >> a small patch but I just don¹t want to people get distracted. >> >> Jiangjie (Becket) Qin >> >> On 3/6/15, 10:15 PM, "tao xiao" <xiaotao...@gmail.com> wrote: >> >> >I think I worked out the root cause >> > >> >Line 593 in MirrorMaker.scala >> > >> >trace("Updating offset for %s to %d".format(topicPartition, offset)) >> >should >> >be >> > >> >trace("Updating offset for %s to %d".format(topicPartition, >> >offset.element)) >> > >> > >> >On Sat, Mar 7, 2015 at 2:12 AM, tao xiao <xiaotao...@gmail.com> wrote: >> > >> >> A bit more context: I turned on async in producer.properties >> >> >> >> On Sat, Mar 7, 2015 at 2:09 AM, tao xiao <xiaotao...@gmail.com> >>wrote: >> >> >> >>> Hi team, >> >>> >> >>> I am having java.util.IllegalFormatConversionException when running >> >>> MirrorMaker with log level set to trace. The code is off latest >>trunk >> >>>with >> >>> commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f >> >>> >> >>> The way I bring up is >> >>> >> >>> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config >> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties >> >>> --producer.config >> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties >> >>> --num.streams 1 --num.producers 1 --no.data.loss --whitelist >> >>> "mm-benchmark-test\\w*" --offset.commit.interval.ms 10000 >> >>> --queue.byte.size 1024 >> >>> and set the log level to trace in tools-log4j.properties >> >>> >> >>> here is the log snippet >> >>> >> >>> [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending >> >>>message >> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) >> >>> >> >>> [2015-03-07 02:04:27,211] TRACE Sending record >> >>> ProducerRecord(topic=mm-benchmark-test, partition=null, >> >>>key=[B@130362d0, >> >>> value=[B@434c4f70 with callback >> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to >>topic >> >>> mm-benchmark-test partition 0 >> >>> (org.apache.kafka.clients.producer.KafkaProducer) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending >> >>>message >> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE Sending record >> >>> ProducerRecord(topic=mm-benchmark-test, partition=null, >> >>>key=[B@54957b67, >> >>> value=[B@21d8d293 with callback >> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to >>topic >> >>> mm-benchmark-test partition 0 >> >>> (org.apache.kafka.clients.producer.KafkaProducer) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending >> >>>message >> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE Sending record >> >>> ProducerRecord(topic=mm-benchmark-test, partition=null, >> >>>key=[B@1eed723b, >> >>> value=[B@1acd590b with callback >> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to >>topic >> >>> mm-benchmark-test partition 0 >> >>> (org.apache.kafka.clients.producer.KafkaProducer) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending >> >>>message >> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE Sending record >> >>> ProducerRecord(topic=mm-benchmark-test, partition=null, >> >>>key=[B@3ae8a936, >> >>> value=[B@bd3671 with callback >> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic >> >>> mm-benchmark-test partition 0 >> >>> (org.apache.kafka.clients.producer.KafkaProducer) >> >>> >> >>> [2015-03-07 02:04:27,212] ERROR Error executing user-provided >>callback >> >>>on >> >>> message for topic-partition mm-benchmark-test-0: >> >>> (org.apache.kafka.clients.producer.internals.RecordBatch) >> >>> >> >>> java.util.IllegalFormatConversionException: d != >> >>> kafka.tools.MirrorMaker$UnackedOffset >> >>> >> >>> at >> >>>>>java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045 >>>>>) >> >>> >> >>> at >> >>>java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) >> >>> >> >>> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) >> >>> >> >>> at java.util.Formatter.format(Formatter.java:2488) >> >>> >> >>> at java.util.Formatter.format(Formatter.java:2423) >> >>> >> >>> at java.lang.String.format(String.java:2790) >> >>> >> >>> at >> >>> >> >>>>>scala.collection.immutable.StringLike$class.format(StringLike.scala:26 >>>>>6) >> >>> >> >>> at scala.collection.immutable.StringOps.format(StringOps.scala:31) >> >>> >> >>> at >> >>> >> >>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onComplet >>>>>io >> >>>n$2.apply(MirrorMaker.scala:592) >> >>> >> >>> at >> >>> >> >>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onComplet >>>>>io >> >>>n$2.apply(MirrorMaker.scala:592) >> >>> >> >>> at kafka.utils.Logging$class.trace(Logging.scala:36) >> >>> >> >>> at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) >> >>> >> >>> at >> >>> >> >>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(Mirro >>>>>rM >> >>>aker.scala:592) >> >>> >> >>> at >> >>> >> >>>>>org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBat >>>>>ch >> >>>.java:91) >> >>> >> >>> at >> >>> >> >>>>>org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sende >>>>>r. >> >>>java:267) >> >>> >> >>> at >> >>> >> >>>>>org.apache.kafka.clients.producer.internals.Sender.handleProduceRespon >>>>>se >> >>>(Sender.java:235) >> >>> >> >>> at >> >>> >> >>>>>org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.j >>>>>av >> >>>a:55) >> >>> >> >>> at >> >>> >> >>>>>org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender >>>>>.j >> >>>ava:312) >> >>> >> >>> at >>org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) >> >>> >> >>> at >> >>>>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199 >>>>>) >> >>> >> >>> at >> >>>>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124 >>>>>) >> >>> >> >>> at java.lang.Thread.run(Thread.java:745) >> >>> >> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending >> >>>message >> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) >> >>> >> >>> >> >>> >> >>> -- >> >>> Regards, >> >>> Tao >> >>> >> >> >> >> >> >> >> >> -- >> >> Regards, >> >> Tao >> >> >> > >> > >> > >> >-- >> >Regards, >> >Tao >> >>