> On March 10, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 45
> > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line45>
> >
> >     I am still a bit concerned about bounding ourselves with one producer 
> > per MM, becausing scaling MMs for the sake of scaling producer is kind of a 
> > waste of JVM containers / memory / etc. On the other hand, having one 
> > producer does give us the benefit of simple partition ordering semantics 
> > for the destination cluster.
> >     
> >     Maybe we can keep it as is but open for increasing the producer number 
> > if we bump into some cases that a single ioThread for sending is not 
> > sufficient.

I was worrying about this before, too. I think the main reason for having more 
than one producer is to ensure we can reach best consumer-producer ratio. Given 
that it is almost guranteed that consumer thread is slower than producer 
thread, whenever we want to have more producers, that means producer are 
serving too many consumer threads. So we can have two MM each with less 
consumer threads. It might cost more resource to run another JVM, but as long 
as we can achieve appropriate consumer-producer ratio, it might be OK. That 
said, I agree that we can keep one producer for now and can bump up producer 
number later if we need it.


> On March 10, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 510-512
> > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line510>
> >
> >     Could you use !exitingOnSendFailure only in the outer loop, and in the 
> > onComplete callback, setting both boolean to true upon exception?

Ah, this is actually a bug.
We need to check exitingOnSendFailure and shuttingDown in both inner loop and 
outer loop.
Normally the program will only run in inner loop. So the inner loop needs to 
check exitingOnSendFailure otherwise it will continue sending messages even 
something went wrong. And it needs to check shuttingDown to stop promptly also. 
The outer loop needs both for the same reason except it does not throw 
ConsumerTimeoutException.
I'm not sure if it is a good idea to set the shuttDown flag in the callback 
directly. 
Anyway, This part of code will change after close with timeout for producer is 
available. We will only depending on the shuttingDown flag in the future. In 
onComplete, we should just call cleanShutdown() and the cleanShutdown will 
check exitingOnSendFailure to decide whether to call producer.close() or 
producer.close(-1);


> On March 10, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 541-542
> > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line541>
> >
> >     Is there a race condition, such that a thread could call 
> > producer.flush() while some other threads are also calling send() to the 
> > producer (in the current implementation flush does not block concurrent 
> > send), such that when it calls commitOffsets there are some offsets that 
> > get committed but not acked yet?

This is a good catch! Because in the design with new consumer, we have one 
consumer per consumer thread, the offsets are not shared. But for now consumer 
threads share one consumer instance so it has this problem.
I'll just let each mirror maker thread has their own connector which will be 
our final design as well.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31706/#review75921
-----------------------------------------------------------


On March 10, 2015, 1:55 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31706/
> -----------------------------------------------------------
> 
> (Updated March 10, 2015, 1:55 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1997
>     https://issues.apache.org/jira/browse/KAFKA-1997
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's comments.
> 
> 
> Changed the exit behavior on send failure because close(0) is not ready yet. 
> Will submit followup patch after KAFKA-1660 is checked in.
> 
> 
> Expanded imports from _ and * to full class path
> 
> 
> Incorporated Joel's comments.
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   core/src/main/scala/kafka/consumer/PartitionAssignor.scala 
> e6ff7683a0df4a7d221e949767e57c34703d5aad 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> 5487259751ebe19f137948249aa1fd2637d2deb4 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> 7f45a90ba6676290172b7da54c15ee5dc1a42a2e 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> bafa379ff57bc46458ea8409406f5046dc9c973e 
>   core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala 
> 543070f4fd3e96f3183cae9ee2ccbe843409ee58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 19640cc55b5baa0a26a808d708b7f4caf491c9f0 
> 
> Diff: https://reviews.apache.org/r/31706/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to