When will KAFKA-1997 be available? Thanks Connie
On Sat, Mar 7, 2015 at 12:48 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Tao, > > Thanks a lot for finding the bug. We are actually rewriting the mirror > maker in KAFKA-1997 with a much simplified solution using the newly added > flush() call in new java producer. > Mirror maker in current trunk is also missing one necessary > synchronization - the UncheckedOffsets.removeOffset is not synchronized. I > am hesitating whether to fix those problems in current trunk or just > waiting for Kafka-1997 to be checked in. If you have a strong opinion > about this, we can probably fix those 2 issues in the trunk. It should be > a small patch but I just donĀ¹t want to people get distracted. > > Jiangjie (Becket) Qin > > On 3/6/15, 10:15 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >I think I worked out the root cause > > > >Line 593 in MirrorMaker.scala > > > >trace("Updating offset for %s to %d".format(topicPartition, offset)) > >should > >be > > > >trace("Updating offset for %s to %d".format(topicPartition, > >offset.element)) > > > > > >On Sat, Mar 7, 2015 at 2:12 AM, tao xiao <xiaotao...@gmail.com> wrote: > > > >> A bit more context: I turned on async in producer.properties > >> > >> On Sat, Mar 7, 2015 at 2:09 AM, tao xiao <xiaotao...@gmail.com> wrote: > >> > >>> Hi team, > >>> > >>> I am having java.util.IllegalFormatConversionException when running > >>> MirrorMaker with log level set to trace. The code is off latest trunk > >>>with > >>> commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f > >>> > >>> The way I bring up is > >>> > >>> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config > >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties > >>> --producer.config > >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties > >>> --num.streams 1 --num.producers 1 --no.data.loss --whitelist > >>> "mm-benchmark-test\\w*" --offset.commit.interval.ms 10000 > >>> --queue.byte.size 1024 > >>> and set the log level to trace in tools-log4j.properties > >>> > >>> here is the log snippet > >>> > >>> [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending > >>>message > >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) > >>> > >>> [2015-03-07 02:04:27,211] TRACE Sending record > >>> ProducerRecord(topic=mm-benchmark-test, partition=null, > >>>key=[B@130362d0, > >>> value=[B@434c4f70 with callback > >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic > >>> mm-benchmark-test partition 0 > >>> (org.apache.kafka.clients.producer.KafkaProducer) > >>> > >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending > >>>message > >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) > >>> > >>> [2015-03-07 02:04:27,212] TRACE Sending record > >>> ProducerRecord(topic=mm-benchmark-test, partition=null, > >>>key=[B@54957b67, > >>> value=[B@21d8d293 with callback > >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic > >>> mm-benchmark-test partition 0 > >>> (org.apache.kafka.clients.producer.KafkaProducer) > >>> > >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending > >>>message > >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) > >>> > >>> [2015-03-07 02:04:27,212] TRACE Sending record > >>> ProducerRecord(topic=mm-benchmark-test, partition=null, > >>>key=[B@1eed723b, > >>> value=[B@1acd590b with callback > >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic > >>> mm-benchmark-test partition 0 > >>> (org.apache.kafka.clients.producer.KafkaProducer) > >>> > >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending > >>>message > >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) > >>> > >>> [2015-03-07 02:04:27,212] TRACE Sending record > >>> ProducerRecord(topic=mm-benchmark-test, partition=null, > >>>key=[B@3ae8a936, > >>> value=[B@bd3671 with callback > >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic > >>> mm-benchmark-test partition 0 > >>> (org.apache.kafka.clients.producer.KafkaProducer) > >>> > >>> [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback > >>>on > >>> message for topic-partition mm-benchmark-test-0: > >>> (org.apache.kafka.clients.producer.internals.RecordBatch) > >>> > >>> java.util.IllegalFormatConversionException: d != > >>> kafka.tools.MirrorMaker$UnackedOffset > >>> > >>> at > >>>java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045) > >>> > >>> at > >>>java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748) > >>> > >>> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702) > >>> > >>> at java.util.Formatter.format(Formatter.java:2488) > >>> > >>> at java.util.Formatter.format(Formatter.java:2423) > >>> > >>> at java.lang.String.format(String.java:2790) > >>> > >>> at > >>> > >>>scala.collection.immutable.StringLike$class.format(StringLike.scala:266) > >>> > >>> at scala.collection.immutable.StringOps.format(StringOps.scala:31) > >>> > >>> at > >>> > >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio > >>>n$2.apply(MirrorMaker.scala:592) > >>> > >>> at > >>> > >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio > >>>n$2.apply(MirrorMaker.scala:592) > >>> > >>> at kafka.utils.Logging$class.trace(Logging.scala:36) > >>> > >>> at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57) > >>> > >>> at > >>> > >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorM > >>>aker.scala:592) > >>> > >>> at > >>> > >>>org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch > >>>.java:91) > >>> > >>> at > >>> > >>>org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender. > >>>java:267) > >>> > >>> at > >>> > >>>org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse > >>>(Sender.java:235) > >>> > >>> at > >>> > >>>org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.jav > >>>a:55) > >>> > >>> at > >>> > >>>org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.j > >>>ava:312) > >>> > >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225) > >>> > >>> at > >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199) > >>> > >>> at > >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124) > >>> > >>> at java.lang.Thread.run(Thread.java:745) > >>> > >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending > >>>message > >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread) > >>> > >>> > >>> > >>> -- > >>> Regards, > >>> Tao > >>> > >> > >> > >> > >> -- > >> Regards, > >> Tao > >> > > > > > > > >-- > >Regards, > >Tao > >