[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15198856#comment-15198856
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r56462751
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -0,0 +1,298 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * KafkaSpoutConfig defines the required configuration to connect a 
consumer to a consumer group, as well as the subscribing topics
    + */
    +public class KafkaSpoutConfig<K, V> implements Serializable {
    +    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            
// 2s
    +    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   
// 15s
    +    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
// Retry forever
    +    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    
// 10,000 records
    +
    +    // Kafka property names
    +    public interface Consumer {
    +        String GROUP_ID = "group.id";
    +        String BOOTSTRAP_SERVERS = "bootstrap.servers";
    +        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
    +        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
    +        String KEY_DESERIALIZER = "key.deserializer";
    +        String VALUE_DESERIALIZER = "value.deserializer";
    +    }
    +
    +    /**
    +     * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will
    +     * affect the number of consumer records returned in the first poll. 
By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
    +     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. <br/>
    +     * <ul>
    +     * <li>EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous commits</li>
    +     * <li>LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of previous 
commits</li>
    +     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as EARLIEST.</li>
    +     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as LATEST.</li>
    +     * </ul>
    +     * */
    +    public enum FirstPollOffsetStrategy {
    +        EARLIEST,
    +        LATEST,
    +        UNCOMMITTED_EARLIEST,
    +        UNCOMMITTED_LATEST }
    +
    +    /**
    +     * Defines when to poll the next batch of records from Kafka. The 
choice of this parameter will affect throughput and the memory
    +     * footprint of the Kafka spout. The allowed values are STREAM and 
BATCH. STREAM will likely have higher throughput and use more memory
    +     * (it stores in memory the entire KafkaRecord,including data). BATCH 
will likely have less throughput but also use less memory.
    +     * The BATCH behavior is similar to the behavior of the previous Kafka 
Spout. De default value is STREAM.
    +     * <ul>
    +     *     <li>STREAM Every periodic call to nextTuple polls a new batch 
of records from Kafka as long as the maxUncommittedOffsets
    +     *     threshold has not yet been reached. When the threshold his 
reached, no more records are polled until enough offsets have been
    +     *     committed, such that the number of pending offsets is less than 
maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
    +     *     </li>
    +     *     <li>BATCH Only polls a new batch of records from kafka once all 
the records that came in the previous poll have been acked.</li>
    +     * </ul>
    +     */
    +    public enum PollStrategy {
    +        STREAM,
    +        BATCH
    --- End diff --
    
    @revans2 @abhishekagarwal87 @harshach thanks a lot for your comments. I do 
agree that this code is optimized for fails as I was a bit concerned on the 
time spent fetching data across the network. I also agree that failure should 
be sporadic, and therefore it's not the best tradeoff to optimize for it. 
Therefore, I am going to change the implementation as follows (please let me 
know if this what you have in mind such that I can get this right). 
    
    [0] - We fetch (poll) a new set of records from Kafka as soon as the 
previous batch has been emitted. We fetch without worrying about acks or fails. 
 What I mean is, if offsets `1 2 3 4 5 6` get fetched, after I have done 
collector.emit(1) ... collector.emit(6), I can fetch `6 7 8 9`. At this point 
1-6 may have been acked, failed, both, or still be in the wire but we don't 
care, we simply fetch `6 7 8 9` and then deal with potencial fails through 
retry.
    
    [1] - If offsets `1 2 3 4 5 6` get fetched (polled)
 and the following 
occurs:
    `1 2` committed
      
    `4` fails
        
    `3 5 6` pending
      [1.1] - We want to fetch again starting on `3`, i.e `3 4 5 6 ...`
          This would result in `3 5 6` being duplicate unless [2]
      [1.2] - Let's say `3` that was pending now fails after the fetch `3 4 5 6 
...` described in [1.1] has happened. What do we do? Do we retry again `3 4 5 6 
...` and deal with dups as described in [2]? Otherwise we can have lots of dups.
    
    [2] - Do we want THIS PATCH to have some logic to avoid re-emitting `3 5 6` 
if they were polled again but have been acked in the meantime? Or do we want to 
support this later?
    
    [3] - Do we want THIS PATCH to support a limited number of retrials for a 
particular offset. Let's say `4` keeps on failing. Do we want to limit the 
number of fails (i.e repeat what is done in [1.1]) to e.g. `3` times, and once 
the limit is reached, we consider `4` has comitted, and fetch from offset `5` 
onwards, i.e. `5 6 7 8 ...`. 
    [3.1] - In this scenario, in case of spout failure, all the offsets that 
potentially fail before `4` will not be retried once we have moved past `4`. We 
may sacrifice guarantee of delivery in this case, or [3.2]
    [3.2] - Do we want to stop retrying `4` (even if it has reached max 
retries) only if all the offsets < `4` have been acked or all have failed more 
than `numMaxRetries`. In this case we won't miss any offsets before `4` unless 
they reached `numMaxRetries`, which is the contract the user says he agrees in 
terms of not having a certain record delivered if it failed more than 
`numMaxRetries`.
    
    [4] - We don't want at all to have the feature to retry from memory. That 
means that there is no need to ever keep tuples in memory, only offsets. 
      [4.1] - Or do we want extract the retry strategy into one interface, and 
have implementations for both?
    
    [5] - Does it make sense to even support retrying forever? This can 
potentially cause an infinite cycle of fetch (poll) calls to Kafka accross the 
network... but it will guarantee delievery up to that point.
    
    @revans2 I understand your urgency. I will make a push to have this done by 
today or tomorrow.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to