Hi Jay,

I’ve uploaded patch in KAFKA-2009 for this. It is a pretty small patch and
it will be great if you can help review it.

Thanks.

Jiangjie (Becket) Qin

On 3/8/15, 12:31 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:

>Hey guys,
>
>If we checked in obviously broken code on trunk, let's fix it now or
>revert
>that change.
>
>-Jay
>
>On Sat, Mar 7, 2015 at 12:48 AM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> Hi Tao,
>>
>> Thanks a lot for finding the bug. We are actually rewriting the mirror
>> maker in KAFKA-1997 with a much simplified solution using the newly
>>added
>> flush() call in new java producer.
>> Mirror maker in current trunk is also missing one necessary
>> synchronization - the UncheckedOffsets.removeOffset is not
>>synchronized. I
>> am hesitating whether to fix those problems in current trunk or just
>> waiting for Kafka-1997 to be checked in. If you have a strong opinion
>> about this, we can probably fix those 2 issues in the trunk. It should
>>be
>> a small patch but I just don¹t want to people get distracted.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/6/15, 10:15 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>>
>> >I think I worked out the root cause
>> >
>> >Line 593 in MirrorMaker.scala
>> >
>> >trace("Updating offset for %s to %d".format(topicPartition, offset))
>> >should
>> >be
>> >
>> >trace("Updating offset for %s to %d".format(topicPartition,
>> >offset.element))
>> >
>> >
>> >On Sat, Mar 7, 2015 at 2:12 AM, tao xiao <xiaotao...@gmail.com> wrote:
>> >
>> >> A bit more context: I turned on async in producer.properties
>> >>
>> >> On Sat, Mar 7, 2015 at 2:09 AM, tao xiao <xiaotao...@gmail.com>
>>wrote:
>> >>
>> >>> Hi team,
>> >>>
>> >>> I am having java.util.IllegalFormatConversionException when running
>> >>> MirrorMaker with log level set to trace. The code is off latest
>>trunk
>> >>>with
>> >>> commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
>> >>>
>> >>> The way I bring up is
>> >>>
>> >>> bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
>> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
>> >>> --producer.config
>> >>> ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
>> >>> --num.streams 1 --num.producers 1 --no.data.loss --whitelist
>> >>> "mm-benchmark-test\\w*" --offset.commit.interval.ms 10000
>> >>> --queue.byte.size 1024
>> >>> and set the log level to trace in tools-log4j.properties
>> >>>
>> >>> here is the log snippet
>> >>>
>> >>> [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending
>> >>>message
>> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
>> >>>
>> >>> [2015-03-07 02:04:27,211] TRACE Sending record
>> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
>> >>>key=[B@130362d0,
>> >>> value=[B@434c4f70 with callback
>> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to
>>topic
>> >>> mm-benchmark-test partition 0
>> >>> (org.apache.kafka.clients.producer.KafkaProducer)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
>> >>>message
>> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE Sending record
>> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
>> >>>key=[B@54957b67,
>> >>> value=[B@21d8d293 with callback
>> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to
>>topic
>> >>> mm-benchmark-test partition 0
>> >>> (org.apache.kafka.clients.producer.KafkaProducer)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
>> >>>message
>> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE Sending record
>> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
>> >>>key=[B@1eed723b,
>> >>> value=[B@1acd590b with callback
>> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to
>>topic
>> >>> mm-benchmark-test partition 0
>> >>> (org.apache.kafka.clients.producer.KafkaProducer)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
>> >>>message
>> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE Sending record
>> >>> ProducerRecord(topic=mm-benchmark-test, partition=null,
>> >>>key=[B@3ae8a936,
>> >>> value=[B@bd3671 with callback
>> >>> kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
>> >>> mm-benchmark-test partition 0
>> >>> (org.apache.kafka.clients.producer.KafkaProducer)
>> >>>
>> >>> [2015-03-07 02:04:27,212] ERROR Error executing user-provided
>>callback
>> >>>on
>> >>> message for topic-partition mm-benchmark-test-0:
>> >>> (org.apache.kafka.clients.producer.internals.RecordBatch)
>> >>>
>> >>> java.util.IllegalFormatConversionException: d !=
>> >>> kafka.tools.MirrorMaker$UnackedOffset
>> >>>
>> >>> at
>> 
>>>>>java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045
>>>>>)
>> >>>
>> >>> at
>> >>>java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)
>> >>>
>> >>> at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)
>> >>>
>> >>> at java.util.Formatter.format(Formatter.java:2488)
>> >>>
>> >>> at java.util.Formatter.format(Formatter.java:2423)
>> >>>
>> >>> at java.lang.String.format(String.java:2790)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>scala.collection.immutable.StringLike$class.format(StringLike.scala:26
>>>>>6)
>> >>>
>> >>> at scala.collection.immutable.StringOps.format(StringOps.scala:31)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onComplet
>>>>>io
>> >>>n$2.apply(MirrorMaker.scala:592)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onComplet
>>>>>io
>> >>>n$2.apply(MirrorMaker.scala:592)
>> >>>
>> >>> at kafka.utils.Logging$class.trace(Logging.scala:36)
>> >>>
>> >>> at kafka.tools.MirrorMaker$.trace(MirrorMaker.scala:57)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>kafka.tools.MirrorMaker$MirrorMakerProducerCallback.onCompletion(Mirro
>>>>>rM
>> >>>aker.scala:592)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBat
>>>>>ch
>> >>>.java:91)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sende
>>>>>r.
>> >>>java:267)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender.handleProduceRespon
>>>>>se
>> >>>(Sender.java:235)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.j
>>>>>av
>> >>>a:55)
>> >>>
>> >>> at
>> >>>
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender
>>>>>.j
>> >>>ava:312)
>> >>>
>> >>> at 
>>org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:225)
>> >>>
>> >>> at
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:199
>>>>>)
>> >>>
>> >>> at
>> 
>>>>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:124
>>>>>)
>> >>>
>> >>> at java.lang.Thread.run(Thread.java:745)
>> >>>
>> >>> [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
>> >>>message
>> >>> with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Regards,
>> >>> Tao
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> Regards,
>> >> Tao
>> >>
>> >
>> >
>> >
>> >--
>> >Regards,
>> >Tao
>>
>>

Reply via email to