----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/22276/#review44859 -----------------------------------------------------------
Overall, looks good. Main point of feedback is on using a case class instead of a tuple for the default/stream-level map. Maybe something like StreamFetchSizes, or something. samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala <https://reviews.apache.org/r/22276/#comment79454> I think resetValue needs to be renamed to something more like fetchMessageMaxBytes. I'm guessing this was a copy/paste variable name. samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala <https://reviews.apache.org/r/22276/#comment79455> Rather than this tuple, maybe can just have a little case class in DefaultFetchSimpleConsumer that holds these two values. It'd be a bit cleaner. - Chris Riccomini On June 5, 2014, 7:30 p.m., Yan Fang wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/22276/ > ----------------------------------------------------------- > > (Updated June 5, 2014, 7:30 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Add stream-level overrides for kafka fetch size config > > 1. added getFetchMessageMaxBytesTopics method which returns a map > topic->fetchSize > 2. used a tuple (default value, map) for all fetchSize variable. First > element is the default or shared fetch size value, second element is the map > from 1) > 3. changed relevent class/methods (BrokerProxy, KafkaSystemConsumer, > DefaultFetchSimpleConsumer) to accept the new type of fetch size > 4. unit test > > > Diffs > ----- > > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 4deabd3 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > e08791f > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala > d90ca78 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > f1edda0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > f4dc1c1 > samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala > 93cf5a5 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala > 9c0ca60 > > Diff: https://reviews.apache.org/r/22276/diff/ > > > Testing > ------- > > > Thanks, > > Yan Fang > >
