-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review55612
-----------------------------------------------------------



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
<https://reviews.apache.org/r/25995/#comment95974>

    Do we need to add "=" here?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/25995/#comment95975>

    We should keep the changes of KAFKA-1647 in its only RB and do not merge 
them here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment95978>

    Could we add some introduction comment here on:
    
    1. The architecture of the MM: producer / consumer thread, data channel per 
producer thread, offset commit thread, and how different modules interact with 
each other.
    2. Why we need a separate offset commit thread, and how it works.
    3. The startup / shutdown process, like which modules to start / shutdown 
first (this could be moved to the head of the corresponding functions also).



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment95979>

    "Embedded consumer config for consuming from the source cluster."



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment95980>

    "Embedded producer config for producing to the target cluster."



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment95981>

    "The offset commit thread periodically commit consumed offsets to the 
source cluster. With the new producer, the offsets are updated upon the 
returned future metadata of the send() call; with the old producer, the offsets 
are updated upon the consumer's iterator advances. By doing this, it is 
guaranteed no data loss even when mirror maker is uncleanly shutdown with the 
new producer, while with the old producer messages inside the data channel 
could be lost upon mirror maker unclean shutdown."



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96019>

    "numMessageCapacity" and "byteCapacity"? "numGetters" and "numPutters" 
(since the producer is the consumer of this buffer and vice versa)?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96021>

    How about "MirrorMaker-DataChannel-queue%d-NumMessages" and 
"MirrorMaker-DataChannel-queue%d-Bytes"? and variable name 
"channelNumMessageHists" and "channelByteHists"?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96020>

    Can we define put(record, queueId) and put(record), and the latter includes 
the logic of determining the queueId and then call the former?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96022>

    comment on why we use the hashCode of source topic / partition here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96026>

    Instead of letting the consumer to check on the global shutdown flag, could 
we just add a shutdown function which sets it own flag like the producer thread 
and the commit thread? Then the process of the shutdown becomes
    
    consumers.shutdown
    consumers.awaitShutdown
    producers.shutdown
    producers.awaitShutdown
    committer.shutdown
    committer.awaitShutdown
    connector.shutdown



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96023>

    Maybe just "// if it exits accidentally, stop the entire mirror maker" as 
we did below?



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96024>

    // if it exits accidentally, stop the entire mirror maker



core/src/main/scala/kafka/tools/MirrorMaker.scala
<https://reviews.apache.org/r/25995/#comment96025>

    // the committed offset will be the first offset of the un-consumed 
message, hence we need to increment by one.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
<https://reviews.apache.org/r/25995/#comment96027>

    "queueNumItemCapacity" and "queueByteCapacity"?


- Guozhang Wang


On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Oct. 6, 2014, 5:20 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> Talked with Joel and decided to remove multi connector support as people can 
> always creat multiple mirror maker instances if they want to consumer from 
> multiple clusters.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> fbc680fde21b02f11285a4f4b442987356abd17b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> b8698ee1469c8fbc92ccc176d916eb3e28b87867 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to