----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review64152 -----------------------------------------------------------
Thanks for the patch. Some comments below. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment106601> Should this api be in ConsumerConnector? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/25995/#comment106604> Should this be part of the public api in ConsumerConnector? Also, could we restruct the api a bit better? It seems that the public api should just be commitOffsets(offsetMap) since we know this is manual offset commit. We can let the other api commitOffsets(isAutoCommit) gets the internal offsets and pass them to commitOffsets(offsetMap). core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment106606> If we expect consumerConfig to be a singleton, we should just use options.valueOf(), instead of valuesOf().head. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment106605> This is really weird. We get a customized listener from the consumer config, but expects the listener to implement a special class MirrorMakerConsumerRelabanceListener, instead of the standard ConsumerRebalanceListener. It's probably better to get this from a MirrorMaker input param that defaults to MirrorMakerConsumerRelabanceListener. We can then add a description if custoimzation is needed, which class the customized implmenetation needs to extend from. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment106623> Does that imply there is always an offset? Is that always true? I don't quite follow how the logic works. Since the offset for each target partition is updated independantly, I am not sure why you can rely on checking that those offsets are consecutive. Also, does this logic work when there is partitioning key? It would be useful to add some comments to describe why a two-level offset map is needed. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment106622> It's weird to extend from NewShinyProducer but not using its method. Perhaps it will be clearer if we just let MirrorMakerNewProducer implement MirrorMakerBaseProducer. Ditto for MirrorMakerOldProducer. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/25995/#comment106621> This kind of coordination doesn't quite work. Suppose that we set inRebalance to true and entered the while loop. However, just before we call inRelanace.wait(), the producer could have finished sending all data and called inRebalance.notify(). Then we will be stuck in inRebalance.wait() forever since we missed the notification. One way to do that is to create a lock that protects the read/write of numMessageUnacked. Then we use a condition created from the lock to do the coordination. This way, both the wait/notification and the update/check of numMessageUnacked are protected by the same lock. - Jun Rao On Dec. 4, 2014, 7:59 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/25995/ > ----------------------------------------------------------- > > (Updated Dec. 4, 2014, 7:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1650 and KAKFA-1650 > https://issues.apache.org/jira/browse/KAFKA-1650 > https://issues.apache.org/jira/browse/KAKFA-1650 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Addressed Guozhang's comments > > > commit before switch to trunk > > > commit before rebase > > > Rebased on trunk, Addressed Guozhang's comments. > > > Addressed Guozhang's comments on MaxInFlightRequests > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. > > > Added consumer rebalance listener to mirror maker, will test it later. > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > Conflicts: > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > > added custom config for consumer rebalance listener > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Add configurable consumer rebalance listener > > > Incorporated Guozhang's comments > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > mirrormaker-redesign > > > Incorporated Guozhang's comments. > > > Addressed Guozhang's comment. > > > numMessageUnacked should be decremented no matter the send was successful or > not. > > > Diffs > ----- > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > da29a8cb461099eb675161db2f11a9937424a5c6 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > PRE-CREATION > core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala > 9d5a47fb8e04d0055cce820afde7f73affc0a984 > core/src/main/scala/kafka/tools/MirrorMaker.scala > f399105087588946987bbc84e3759935d9498b6a > core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala > 6a85d7e494f6c88798133a17f6180b61029dff58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 > > Diff: https://reviews.apache.org/r/25995/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >