Chris Riccomini created SAMZA-21:
------------------------------------
Summary: Change KafkaSystemConsumer and BrokerProxy consumer
defaults
Key: SAMZA-21
URL: https://issues.apache.org/jira/browse/SAMZA-21
Project: Samza
Issue Type: Bug
Reporter: Chris Riccomini
The Kafka BrokerProxy and KafkaSystemConsumer currently both have a socket
timeout with a default of Int.MaxValue:
{code}
val timeout: Int = Int.MaxValue,
{code}
We don't actually use this default when we use the KafkaSystemFactory, because
we use ConsumerConfig's default, which is 30 seconds. Nevertheless, we should
change both classes to default to ConsumerConfig.SocketTimeout.
While we're at it, we should also change bufferSize to be
ConsumerConfig.SocketBufferSize, and we should change DefaultFetch to use:
{code}
val maxWait:Int = ConsumerConfig.MaxFetchWaitMs
val minBytes:Int = ConsumerConfig.MinFetchBytes
{code}
We should also actually use maxWait in DefaultFetch.defaultFetch. Right now,
we're hard coding 1000ms.
Finally, BrokerProxy has the fetchSize hard coded:
{code}
val fetchSize: Int = 256 * 1024
{code}
We should make this parameter configurable
(systems.%s.consumer.fetch.message.max.bytes), and pass it from the
KafkaSystemFactory to the KafkaSystem to the BrokerProxy, just like timeout and
buffer size.
(ConsumerConfig is Kafka's config class)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira