Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r56402570
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * KafkaSpoutConfig defines the required configuration to connect a 
consumer to a consumer group, as well as the subscribing topics
    + */
    +public class KafkaSpoutConfig<K, V> implements Serializable {
    +    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            
// 2s
    +    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   
// 15s
    +    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
// Retry forever
    +    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    
// 10,000 records
    +
    +    // Kafka property names
    +    public interface Consumer {
    +        String GROUP_ID = "group.id";
    +        String BOOTSTRAP_SERVERS = "bootstrap.servers";
    +        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
    +        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
    +        String KEY_DESERIALIZER = "key.deserializer";
    +        String VALUE_DESERIALIZER = "value.deserializer";
    +    }
    +
    +    /**
    +     * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will
    +     * affect the number of consumer records returned in the first poll. 
By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
    +     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. <br/>
    +     * <ul>
    +     * <li>EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous commits</li>
    +     * <li>LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of previous 
commits</li>
    +     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as EARLIEST.</li>
    +     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as LATEST.</li>
    +     * </ul>
    +     * */
    +    public enum FirstPollOffsetStrategy {
    +        EARLIEST,
    +        LATEST,
    +        UNCOMMITTED_EARLIEST,
    +        UNCOMMITTED_LATEST }
    +
    +    /**
    +     * Defines when to poll the next batch of records from Kafka. The 
choice of this parameter will affect throughput and the memory
    +     * footprint of the Kafka spout. The allowed values are STREAM and 
BATCH. STREAM will likely have higher throughput and use more memory
    +     * (it stores in memory the entire KafkaRecord,including data). BATCH 
will likely have less throughput but also use less memory.
    +     * The BATCH behavior is similar to the behavior of the previous Kafka 
Spout. De default value is STREAM.
    +     * <ul>
    +     *     <li>STREAM Every periodic call to nextTuple polls a new batch 
of records from Kafka as long as the maxUncommittedOffsets
    +     *     threshold has not yet been reached. When the threshold his 
reached, no more records are polled until enough offsets have been
    +     *     committed, such that the number of pending offsets is less than 
maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
    +     *     </li>
    +     *     <li>BATCH Only polls a new batch of records from kafka once all 
the records that came in the previous poll have been acked.</li>
    +     * </ul>
    +     */
    +    public enum PollStrategy {
    +        STREAM,
    +        BATCH
    --- End diff --
    
    @harshach You are combining two different features/issues here.  The first 
one is limiting the maximum number of replay.  +1 this is a great feature.  The 
second one is how we do a replay on a failure when we have not hit the replay 
limit yet.
    
    This code keeps the entire tuple in memory along with its offset so when a 
failure happens we can replay the message without talking to kafka.  The 
original KafkaSpout would only keep the offset in memory and if it needed to 
replay the message it would go back to kafka, fetch an entire batch of messages 
and drop the ones that didn't need to be replayed.
    
    My opinion is that failure is not the common case, so lets not optimize for 
it.  The current code is much simpler, but also much more memory intensive, so 
lets keep the in-memory replay for now, but file a JIRA and go back to reduce 
the memory usage by adding in the more complex code that uses less memory.
    
    If the entire point of the BATCH config is to reduce the memory footprint 
of the spout, and at the same time make the throughput really bad, lets not do 
it.  Instead I would prefer to release a memory hungry spout sooner and spend 
our efforts on this second JIRA that would reduce the memory usage without 
impacting the common case throughput.  If there is something else about the 
BATCH config that I am missing, or is going to be important in the future 
please let me know. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to