> On March 10, 2015, 7:10 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 45 > > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line45> > > > > I am still a bit concerned about bounding ourselves with one producer > > per MM, becausing scaling MMs for the sake of scaling producer is kind of a > > waste of JVM containers / memory / etc. On the other hand, having one > > producer does give us the benefit of simple partition ordering semantics > > for the destination cluster. > > > > Maybe we can keep it as is but open for increasing the producer number > > if we bump into some cases that a single ioThread for sending is not > > sufficient.
I was worrying about this before, too. I think the main reason for having more than one producer is to ensure we can reach best consumer-producer ratio. Given that it is almost guranteed that consumer thread is slower than producer thread, whenever we want to have more producers, that means producer are serving too many consumer threads. So we can have two MM each with less consumer threads. It might cost more resource to run another JVM, but as long as we can achieve appropriate consumer-producer ratio, it might be OK. That said, I agree that we can keep one producer for now and can bump up producer number later if we need it. > On March 10, 2015, 7:10 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 510-512 > > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line510> > > > > Could you use !exitingOnSendFailure only in the outer loop, and in the > > onComplete callback, setting both boolean to true upon exception? Ah, this is actually a bug. We need to check exitingOnSendFailure and shuttingDown in both inner loop and outer loop. Normally the program will only run in inner loop. So the inner loop needs to check exitingOnSendFailure otherwise it will continue sending messages even something went wrong. And it needs to check shuttingDown to stop promptly also. The outer loop needs both for the same reason except it does not throw ConsumerTimeoutException. I'm not sure if it is a good idea to set the shuttDown flag in the callback directly. Anyway, This part of code will change after close with timeout for producer is available. We will only depending on the shuttingDown flag in the future. In onComplete, we should just call cleanShutdown() and the cleanShutdown will check exitingOnSendFailure to decide whether to call producer.close() or producer.close(-1); > On March 10, 2015, 7:10 p.m., Guozhang Wang wrote: > > core/src/main/scala/kafka/tools/MirrorMaker.scala, lines 541-542 > > <https://reviews.apache.org/r/31706/diff/6/?file=890015#file890015line541> > > > > Is there a race condition, such that a thread could call > > producer.flush() while some other threads are also calling send() to the > > producer (in the current implementation flush does not block concurrent > > send), such that when it calls commitOffsets there are some offsets that > > get committed but not acked yet? This is a good catch! Because in the design with new consumer, we have one consumer per consumer thread, the offsets are not shared. But for now consumer threads share one consumer instance so it has this problem. I'll just let each mirror maker thread has their own connector which will be our final design as well. - Jiangjie ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31706/#review75921 ----------------------------------------------------------- On March 10, 2015, 1:55 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31706/ > ----------------------------------------------------------- > > (Updated March 10, 2015, 1:55 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1997 > https://issues.apache.org/jira/browse/KAFKA-1997 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Changed the exit behavior on send failure because close(0) is not ready yet. > Will submit followup patch after KAFKA-1660 is checked in. > > > Expanded imports from _ and * to full class path > > > Incorporated Joel's comments. > > > Addressed Joel's comments. > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f > core/src/main/scala/kafka/consumer/PartitionAssignor.scala > e6ff7683a0df4a7d221e949767e57c34703d5aad > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 5487259751ebe19f137948249aa1fd2637d2deb4 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > 7f45a90ba6676290172b7da54c15ee5dc1a42a2e > core/src/main/scala/kafka/tools/MirrorMaker.scala > bafa379ff57bc46458ea8409406f5046dc9c973e > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala > 543070f4fd3e96f3183cae9ee2ccbe843409ee58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > 19640cc55b5baa0a26a808d708b7f4caf491c9f0 > > Diff: https://reviews.apache.org/r/31706/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >