Chris Riccomini created SAMZA-21:
------------------------------------

             Summary: Change KafkaSystemConsumer and BrokerProxy consumer 
defaults
                 Key: SAMZA-21
                 URL: https://issues.apache.org/jira/browse/SAMZA-21
             Project: Samza
          Issue Type: Bug
            Reporter: Chris Riccomini


The Kafka BrokerProxy and KafkaSystemConsumer currently both have a socket 
timeout with a default of Int.MaxValue:

{code}
  val timeout: Int = Int.MaxValue,
{code}

We don't actually use this default when we use the KafkaSystemFactory, because 
we use ConsumerConfig's default, which is 30 seconds. Nevertheless, we should 
change both classes to default to ConsumerConfig.SocketTimeout.

While we're at it, we should also change bufferSize to be 
ConsumerConfig.SocketBufferSize, and we should change DefaultFetch to use:

{code}
  val maxWait:Int = ConsumerConfig.MaxFetchWaitMs
  val minBytes:Int = ConsumerConfig.MinFetchBytes
{code}

We should also actually use maxWait in DefaultFetch.defaultFetch. Right now, 
we're hard coding 1000ms.

Finally, BrokerProxy has the fetchSize hard coded:

{code}
      val fetchSize: Int = 256 * 1024
{code}

We should make this parameter configurable 
(systems.%s.consumer.fetch.message.max.bytes), and pass it from the 
KafkaSystemFactory to the KafkaSystem to the BrokerProxy, just like timeout and 
buffer size.

(ConsumerConfig is Kafka's config class)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to