This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new c9083e8 SAMZA-2109: Reduce per-partition default buffer sizes c9083e8 is described below commit c9083e843bdd4c219c1736aa855f106cd8d0c154 Author: Jagadish <jvenkatra...@linkedin.com> AuthorDate: Tue Feb 19 13:48:56 2019 -0800 SAMZA-2109: Reduce per-partition default buffer sizes Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Shanthoosh <svenkatara...@linkedin.com> Closes #925 from vjagadish1989/samza-2109 --- docs/learn/documentation/versioned/jobs/samza-configurations.md | 2 +- .../main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 54ec12c..757d552 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -158,7 +158,7 @@ Samples found [here](../../../../startup/hello-samza/versioned) |--- |--- |--- | |systems.**_system-name_**.<br>consumer.*| |Any [Kafka consumer configuration](http://kafka.apache.org/documentation.html#newconsumerconfigs) can be included here. For example, to change the socket timeout, you can set `systems.system-name.consumer.socket.timeout.ms`. (There is no need to configure `group.id` or `client.id`, as they are automatically configured by Samza. Also, there is no need to set `auto.commit.enable` because Samza has its own checkpointing mechanism.)| |systems.**_system-name_**.<br>producer.*| |Any [Kafka producer configuration](http://kafka.apache.org/documentation.html#producerconfigs) can be included here. For example, to change the request timeout, you can set `systems.system-name.producer.timeout.ms`. (There is no need to configure `client.id` as it is automatically configured by Samza.)| -|systems.**_system-name_**.<br>samza.fetch.threshold|5000|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer [...] +|systems.**_system-name_**.<br>samza.fetch.threshold|10000|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer [...] |systems.**_system-name_**.<br>samza.fetch.threshold.bytes|-1|When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in order to increase throughput (the stream task can continue processing buffered messages while new messages are fetched from Kafka). This parameter determines the total size of messages we aim to buffer across all stream partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered pref [...] #### <a name="hdfs"></a>[3.3 HDFS](#hdfs) diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java index acef057..93ded8b 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java @@ -49,7 +49,7 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class); - private static final long FETCH_THRESHOLD = 50000; + private static final long FETCH_THRESHOLD = 10000; private static final long FETCH_THRESHOLD_BYTES = -1L; protected final Consumer<K, V> kafkaConsumer;