Hey guys,

If we checked in obviously broken code on trunk, let's fix it now or revert
that change.

-Jay

On Sat, Mar 7, 2015 at 12:48 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Tao,
>
> Thanks a lot for finding the bug. We are actually rewriting the mirror
> maker in KAFKA-1997 with a much simplified solution using the newly added
> flush() call in new java producer.
> Mirror maker in current trunk is also missing one necessary
> synchronization - the UncheckedOffsets.removeOffset is not synchronized. I
> am hesitating whether to fix those problems in current trunk or just
> waiting for Kafka-1997 to be checked in. If you have a strong opinion
> about this, we can probably fix those 2 issues in the trunk. It should be
> a small patch but I just donĀ¹t want to people get distracted.
>
> Jiangjie (Becket) Qin
>
> On 3/6/15, 10:15 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>
> >I think I worked out the root cause
> >
> >Line 593 in MirrorMaker.scala
> >
> >trace("Updating offset for %s to %d".format(topicPartition, offset))
> >should
> >be
> >
> >trace("Updating offset for %s to %d".format(topicPartition,
> >offset.element))
> >
> >
> >On Sat, Mar 7, 2015 at 2:12 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> A bit more context: I turned on async in producer.properties
> >>
> >> On Sat, Mar 7, 2015 at 2:09 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >>
> >>> Hi team,
> >>>
> >>> I am having java.util.IllegalFormatConversionException when running
> >>> MirrorMaker with log level set to trace. The code is off latest trunk
> >>>with
> >>> commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
> >>>
> >>> The way I bring up is
> >>>
> >>> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
> >>> --producer.config
> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
> >>> --num.streams 1 --num.producers 1 --no.data.loss --whitelist
> >>> "mm-benchmark-test\\w*" --offset.commit.interval.ms 10000
> >>> --queue.byte.size 1024
> >>> and set the log level to trace in tools-log4j.properties
> >>>
> >>> here is the log snippet
> >>>
> >>> [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,211] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@130362d0,
> >>> value=[B@434c4f70 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@54957b67,
> >>> value=[B@21d8d293 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@1eed723b,
> >>> value=[B@1acd590b with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@3ae8a936,
> >>> value=[B@bd3671 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback
> >>>on
> >>> message for topic-partition mm-benchmark-test-0:
> >>> (org.apache.kafka.clients.producer.internals.RecordBatch)
> >>>
> >>> java.util.IllegalFormatConversionException: d !=
> >>> kafka.tools.MirrorMaker$UnackedOffset
> >>>
> >>> at
> >>>java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)
> >>>
> >>> at
> >>>java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)
> >>>
> >>> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)
> >>>
> >>> at java.util.Formatter.format(Formatter.java:2488)
> >>>
> >>> at java.util.Formatter.format(Formatter.java:2423)
> >>>
> >>> at java.lang.String.format(String.java:2790)
> >>>
> >>> at
> >>>
> >>>scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
> >>>
> >>> at scala.collection.immutable.StringOps.format(StringOps.scala:31)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
> >>>n$2.apply(MirrorMaker.scala:592)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
> >>>n$2.apply(MirrorMaker.scala:592)
> >>>
> >>> at kafka.utils.Logging$class.trace(Logging.scala:36)
> >>>
> >>> at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorM
> >>>aker.scala:592)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch
> >>>.java:91)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.
> >>>java:267)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse
> >>>(Sender.java:235)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.jav
> >>>a:55)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.j
> >>>ava:312)
> >>>
> >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)
> >>>
> >>> at
> >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)
> >>>
> >>> at
> >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)
> >>>
> >>> at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>>
> >>>
> >>> --
> >>> Regards,
> >>> Tao
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>

Reply via email to