Actually I was going to report another bug that was exactly caused by
UncheckedOffsets.removeOffset
issue (remove offsets before it is added)

As the current project I am working on heavily relies on the
functionalities MM offers it would be good that if you put the fix to trunk
or gives me some advices how to fix the synchronization issue.

BTW can the synchronization issue be fixed by adding the unackedoffset to
the offset list before calling producer.send ?

On Sat, Mar 7, 2015 at 4:48 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Tao,
>
> Thanks a lot for finding the bug. We are actually rewriting the mirror
> maker in KAFKA-1997 with a much simplified solution using the newly added
> flush() call in new java producer.
> Mirror maker in current trunk is also missing one necessary
> synchronization - the UncheckedOffsets.removeOffset is not synchronized. I
> am hesitating whether to fix those problems in current trunk or just
> waiting for Kafka-1997 to be checked in. If you have a strong opinion
> about this, we can probably fix those 2 issues in the trunk. It should be
> a small patch but I just donĀ¹t want to people get distracted.
>
> Jiangjie (Becket) Qin
>
> On 3/6/15, 10:15 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>
> >I think I worked out the root cause
> >
> >Line 593 in MirrorMaker.scala
> >
> >trace("Updating offset for %s to %d".format(topicPartition, offset))
> >should
> >be
> >
> >trace("Updating offset for %s to %d".format(topicPartition,
> >offset.element))
> >
> >
> >On Sat, Mar 7, 2015 at 2:12 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> >> A bit more context: I turned on async in producer.properties
> >>
> >> On Sat, Mar 7, 2015 at 2:09 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >>
> >>> Hi team,
> >>>
> >>> I am having java.util.IllegalFormatConversionException when running
> >>> MirrorMaker with log level set to trace. The code is off latest trunk
> >>>with
> >>> commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
> >>>
> >>> The way I bring up is
> >>>
> >>> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
> >>> --producer.config
> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
> >>> --num.streams 1 --num.producers 1 --no.data.loss --whitelist
> >>> "mm-benchmark-test\\w*" --offset.commit.interval.ms 10000
> >>> --queue.byte.size 1024
> >>> and set the log level to trace in tools-log4j.properties
> >>>
> >>> here is the log snippet
> >>>
> >>> [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,211] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@130362d0,
> >>> value=[B@434c4f70 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@54957b67,
> >>> value=[B@21d8d293 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@1eed723b,
> >>> value=[B@1acd590b with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE Sending record
> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
> >>>key=[B@3ae8a936,
> >>> value=[B@bd3671 with callback
> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
> >>> mm-benchmark-test partition 0
> >>> (org.apache.kafka.clients.producer.KafkaProducer)
> >>>
> >>> [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback
> >>>on
> >>> message for topic-partition mm-benchmark-test-0:
> >>> (org.apache.kafka.clients.producer.internals.RecordBatch)
> >>>
> >>> java.util.IllegalFormatConversionException: d !=
> >>> kafka.tools.MirrorMaker$UnackedOffset
> >>>
> >>> at
> >>>java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)
> >>>
> >>> at
> >>>java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)
> >>>
> >>> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)
> >>>
> >>> at java.util.Formatter.format(Formatter.java:2488)
> >>>
> >>> at java.util.Formatter.format(Formatter.java:2423)
> >>>
> >>> at java.lang.String.format(String.java:2790)
> >>>
> >>> at
> >>>
> >>>scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
> >>>
> >>> at scala.collection.immutable.StringOps.format(StringOps.scala:31)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
> >>>n$2.apply(MirrorMaker.scala:592)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
> >>>n$2.apply(MirrorMaker.scala:592)
> >>>
> >>> at kafka.utils.Logging$class.trace(Logging.scala:36)
> >>>
> >>> at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)
> >>>
> >>> at
> >>>
> >>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(MirrorM
> >>>aker.scala:592)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch
> >>>.java:91)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.
> >>>java:267)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse
> >>>(Sender.java:235)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.jav
> >>>a:55)
> >>>
> >>> at
> >>>
> >>>org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.j
> >>>ava:312)
> >>>
> >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)
> >>>
> >>> at
> >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199)
> >>>
> >>> at
> >>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124)
> >>>
> >>> at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
> >>>message
> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
> >>>
> >>>
> >>>
> >>> --
> >>> Regards,
> >>> Tao
> >>>
> >>
> >>
> >>
> >> --
> >> Regards,
> >> Tao
> >>
> >
> >
> >
> >--
> >Regards,
> >Tao
>
>


-- 
Regards,
Tao

Reply via email to