Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r56517067
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * KafkaSpoutConfig defines the required configuration to connect a 
consumer to a consumer group, as well as the subscribing topics
    + */
    +public class KafkaSpoutConfig<K, V> implements Serializable {
    +    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            
// 2s
    +    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   
// 15s
    +    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
// Retry forever
    +    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    
// 10,000 records
    +
    +    // Kafka property names
    +    public interface Consumer {
    +        String GROUP_ID = "group.id";
    +        String BOOTSTRAP_SERVERS = "bootstrap.servers";
    +        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
    +        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
    +        String KEY_DESERIALIZER = "key.deserializer";
    +        String VALUE_DESERIALIZER = "value.deserializer";
    +    }
    +
    +    /**
    +     * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will
    +     * affect the number of consumer records returned in the first poll. 
By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
    +     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. <br/>
    +     * <ul>
    +     * <li>EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous commits</li>
    +     * <li>LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of previous 
commits</li>
    +     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as EARLIEST.</li>
    +     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as LATEST.</li>
    +     * </ul>
    +     * */
    +    public enum FirstPollOffsetStrategy {
    +        EARLIEST,
    +        LATEST,
    +        UNCOMMITTED_EARLIEST,
    +        UNCOMMITTED_LATEST }
    +
    +    /**
    +     * Defines when to poll the next batch of records from Kafka. The 
choice of this parameter will affect throughput and the memory
    +     * footprint of the Kafka spout. The allowed values are STREAM and 
BATCH. STREAM will likely have higher throughput and use more memory
    +     * (it stores in memory the entire KafkaRecord,including data). BATCH 
will likely have less throughput but also use less memory.
    +     * The BATCH behavior is similar to the behavior of the previous Kafka 
Spout. De default value is STREAM.
    +     * <ul>
    +     *     <li>STREAM Every periodic call to nextTuple polls a new batch 
of records from Kafka as long as the maxUncommittedOffsets
    +     *     threshold has not yet been reached. When the threshold his 
reached, no more records are polled until enough offsets have been
    +     *     committed, such that the number of pending offsets is less than 
maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
    +     *     </li>
    +     *     <li>BATCH Only polls a new batch of records from kafka once all 
the records that came in the previous poll have been acked.</li>
    +     * </ul>
    +     */
    +    public enum PollStrategy {
    +        STREAM,
    +        BATCH
    --- End diff --
    
    Now to answer your other question.  It seems like you have thought through 
this a lot.  Philosophically I think that modular code, with interfaces is 
usually a good idea, especially for making the code unit testable.  Exposing 
those to end users and making them pluggable without a really good use case 
make the code more difficult to maintain because I have to maintain that 
interface for good or bad.
    
    I personally would like to reuse a lot of the design from the original 
spout because we know it works.
    
    **All of the following describes what the follow on spout should look like 
unless otherwise stated. My changes/comments are in bold**
    
    [0] - We fetch (poll) a new set of records from Kafka **when we don't have 
something to emit from our buffer.  The internal spout flow control assumes 
that we will emit around one tuple at a time when it calls `.nextTuple()`.  So 
we have a buffer of messages to emit and emit them one at a time**  
    
    [1] - If offsets 1 2 3 4 5 6 get fetched (polled)
 and the following 
occurs: 1 2 committed, 4 fails
 , 3 5 6 pending **and the exponential 
back-off for 4 has expired and 4 has been retired less then the maximum number 
of retries**.
    
      * [1.1] - We want to fetch again starting on **4, i.e 4 5 6 ... and then 
filter out any messages that are not ready to be replayed.**
    
      * [1.2] - Let's say 3 that was pending fails after the retry fetch 
described in [1.1] **(4 5 6 ... )** has happened. What do we do? **We fetch 
again starting with 3 this time (3 4 5 ...)**
    
    [2] - **We want in the follow on JIRA** to have some logic to avoid 
re-emitting **any tuples that have been emitted and not had a corresponding 
`fail()` called on them yet.  Preferably with some sort of a back-off like the 
original code.**
    
    [3] - **We want this patch** to support a limited number of retrials for a 
particular offset**.** Let's say 4 keeps on failing. **W**e want to limit the 
number of fails (i.e repeat what is done in [1.1]) to e.g. 3 times, and once 
the limit is reached, we consider 4 has committed**. In the follow-on patch at 
a minimum we need a metric to count how many of these we have dropped.**
    
      * [3.1] - In this scenario, **we are only replaying failed messages by 
placing them into the same buffer/queue that we emit from. We will continue to 
fetch new messages into that queue so long as we have not violated the 
maxUncommitedOffsets, but we will never commit anything above 4 until it has 
failed the maximum number of times.  This means we will not violate the 
guarantee, even if the spout crashes and a new spout looses all of its state, 
unless we have kafka auto-commit turned on.** 
    
       * [3.2] - Do we want to stop retrying 4 (even if it has reached max 
retries) only if all the offsets less than 4 have been acked, or all have 
failed more than maxNumRetries**? maxNumRetires needs to be marked as a best 
effort.  If the spout fails before we can persist the data to kafka we may 
retry the message more often.**
    
    [4] - We do NOT want at all to have the feature to retry from memory. That 
means that there is no need to ever keep tuples in memory, only offsets. 
**Ultimately I think so.  I still have not seen a use case that warrants it.**
    
       * [4.1] - Or do we want extract the retry strategy into one interface, 
and have implementations for both? **Like I said above having an interface that 
allows us to make it pluggable, especially for unit tests is a great thing, but 
exposing it to the end user without a good use case is not.**
    
    [5] - Does it make sense to even support retrying an offset forever? This 
can potentially cause an infinite cycle of fetch (poll) calls to Kafka accross 
the network... but it will guarantee delivery up to that point (offset).  **I 
don't have one, but to work around this not being there I think this is why we 
have the FirstPollOffsetStrategy so we can pick up in different spot and skip 
something that is bad**
    
    
    The more I look at the original spout the more I see a lot of good 
design/features in it, despite its complexity I would suggest you look at how 
it handles replay.
    
    There is a manager that keeps track of what should/should not be replayed 
and when.
    
    The current implementation is 
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
    
    For it there is a pulggable interface, but it is not exposed to the end 
user.  Using that code when a new batch is fetch it will only replay tuples 
that have failed and have met the criteria to be replayed.  
    
    
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L231-L241
    
    A lot of that code could be reused almost with no changes at all.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to