-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
-----------------------------------------------------------



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment94830>

    Do we need to do this check every time in the loop?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94831>

    no need empty line here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94832>

    No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94833>

    No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94835>

    Maximum bytes that can be buffered in the data channels



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94834>

    in terms of bytes



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94836>

    Inconsistency indentation.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94838>

    Capitalize: Offset commit interval in ms



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94841>

    Do you need to turn off auto commit on the consumer threads here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94840>

    We can add some more comment here, explaning:
    
    1) why we add the offset commit thread for new producer, but not old 
producer;
    
    2) what risks does the old producer have (for not having offset commit 
thread).



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94842>

    For clean shutdown, you need to
    
    1) "halt" consumer threads first.
    
    2) wait for producer to drain all the messages in data channel.
    
    3) manually commit offsets on consumer threads.
    
    4) shut down consumer threads.
    
    Otherwise we will have data duplicates as we commit offsets based on min.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94844>

    queueId



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94846>

    How about having a histogram for each queue instead of getting the sum? The 
update call would be a bit less expensive and we can monitor if some queues are 
empty while others get all the data.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94847>

    Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94849>

    Add comments explaining why we force an unclean shutdown with System.exit 
here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94857>

    Unfortunately this may not be the case, as we can have multiple connectors 
which are using different consumer configs with different group ids. We need to 
either 1) change the config settings to enforce this to be true, or 2) use a 
separate offset client that remembers which topics belongs to which groups.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94858>

    Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94859>

    Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94863>

    Adding comment to the logic of how this works. Also a few questions:
    
    1) is the map() call synchronized with other threads putting new offsets 
into the map?
    
    2) after the sorting, the logic may be clearer as
    
    val commitableOffsetIndex = 0
    while (offsets[commitableOffsetIndex] - offsets.head == 
commitableOffsetIndex) commitableOffsetIndex += 1
    
    offsetToCommit = offsets[commitableOffsetIndex] + 1



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment94855>

    The send().get() call is missing.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/25995/#comment94853>

    Apache header missing.


- Guozhang Wang


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Sept. 24, 2014, 4:26 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> mirror maker redesign; adding byte bounded blocking queue.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> fbc680fde21b02f11285a4f4b442987356abd17b 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> b8698ee1469c8fbc92ccc176d916eb3e28b87867 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to