----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31706/#review75785 -----------------------------------------------------------
Patch does not apply. Can you rebase? core/src/main/scala/kafka/consumer/PartitionAssignor.scala <https://reviews.apache.org/r/31706/#comment123061> Can you make this a bit clearer? ``` val assignmentForConsumer = partitionAssignment.getAndMaybePut(threadId.consumer) assignmentForConsumer += ... ``` core/src/main/scala/kafka/consumer/PartitionAssignor.scala <https://reviews.apache.org/r/31706/#comment123063> Similar comment as above core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment123064> `val myPartitionAssignment = Option(globalPartitionAssignment.get(assignmentContext.consumerId)).getOrElse(collection.mutable.HashMap[..].empty)` core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment123068> It goes without saying, this is a terribly confusing block that needs to be refactored/broken up/well-documented as much as possible. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment123065> Invoking rebalance listener before starting fetchers (Clear enough but slightly cleaner than putting a function call with parantheses in application logs) core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment123066> can you use the case(k, v) format here? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala <https://reviews.apache.org/r/31706/#comment123067> can you break this up into two separate statements core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java <https://reviews.apache.org/r/31706/#comment123069> can we rename the argument to globalPartitionAssignment? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123070> share one producer core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123071> periodically flushes the... core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123087> Would it be clearer to call this metric numMirrorMakerDrops core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123074> space after comma. Can you also clarify in the comment on the need to unblock so that the flush/commit can happen? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123076> Why don't we have a constructor variant for the message handler that accepts the argument list as we do for the rebalance listener? core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123078> Old code, but let us use require instead of assert core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123081> Can you create an explicit lock object? i.e., val offsetCommitLock = new Object() core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123082> I think it would be useful to have a check on the time since last commit here as well. See comment further below. core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123077> It may be worth adding an INFO statement here stating "Property X overridden to Y (if different from default value) - data loss is possible" core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123079> rename to shuttingDown core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123083> Since this is synchronized and commit can take a few milliseconds, multiple consumer threads could be blocked here (since they would start at roughly the same time). All of them will unnecessarily commit offsets again within a few milliseconds of each other - this can be protected by checking the time again in commitOffsets OR I think you can synchronize this method as well core/src/main/scala/kafka/tools/MirrorMaker.scala <https://reviews.apache.org/r/31706/#comment123088> Failed to send message to ... Also, it is probably better to _not_ log the number of skipped messages since that is an accumulated count and is available via an mbean anyway. Patch needs a rebase. - Joel Koshy On March 6, 2015, 4:15 a.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31706/ > ----------------------------------------------------------- > > (Updated March 6, 2015, 4:15 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1997 > https://issues.apache.org/jira/browse/KAFKA-1997 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's comments. > > > Changed the exit behavior on send failure because close(0) is not ready yet. > Will submit followup patch after KAFKA-1660 is checked in. > > > Expanded imports from _ and * to full class path > > > Incorporated Joel's comments. > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f > core/src/main/scala/kafka/consumer/PartitionAssignor.scala > e6ff7683a0df4a7d221e949767e57c34703d5aad > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > 5487259751ebe19f137948249aa1fd2637d2deb4 > core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java > 7f45a90ba6676290172b7da54c15ee5dc1a42a2e > core/src/main/scala/kafka/tools/MirrorMaker.scala > 5374280dc97dc8e01e9b3ba61fd036dc13ae48cb > core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala > 543070f4fd3e96f3183cae9ee2ccbe843409ee58 > > core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala > a17e8532c44aadf84b8da3a57bcc797a848b5020 > > Diff: https://reviews.apache.org/r/31706/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >