Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3001#discussion_r112122730
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
    @@ -66,31 +80,48 @@
        //  Consumer properties
        // 
------------------------------------------------------------------------
     
    -   /** The names of the Kinesis streams that we will be consuming from */
    +   /**
    +    * The names of the Kinesis streams that we will be consuming from
    +    */
        private final List<String> streams;
     
    -   /** Properties to parametrize settings such as AWS service region, 
initial position in stream,
    -    * shard list retrieval behaviours, etc */
    +   /**
    +    * Properties to parametrize settings such as AWS service region, 
initial position in stream,
    +    * shard list retrieval behaviours, etc
    +    */
        private final Properties configProps;
     
    -   /** User supplied deseriliazation schema to convert Kinesis byte 
messages to Flink objects */
    +   /**
    +    * User supplied deseriliazation schema to convert Kinesis byte 
messages to Flink objects
    +    */
        private final KinesisDeserializationSchema<T> deserializer;
     
        // 
------------------------------------------------------------------------
        //  Runtime state
        // 
------------------------------------------------------------------------
     
    -   /** Per-task fetcher for Kinesis data records, where each fetcher pulls 
data from one or more Kinesis shards */
    +   /**
    +    * Per-task fetcher for Kinesis data records, where each fetcher pulls 
data from one or more Kinesis shards
    +    */
        private transient KinesisDataFetcher<T> fetcher;
     
    -   /** The sequence numbers in the last state snapshot of this subtask */
    +   /**
    +    * The sequence numbers in the last state snapshot of this subtask
    +    */
        private transient HashMap<KinesisStreamShard, SequenceNumber> 
lastStateSnapshot;
     
    -   /** The sequence numbers to restore to upon restore from failure */
    +   /**
    +    * The sequence numbers to restore to upon restore from failure
    +    */
        private transient HashMap<KinesisStreamShard, SequenceNumber> 
sequenceNumsToRestore;
     
        private volatile boolean running = true;
     
    +   // 
------------------------------------------------------------------------
    +   // State for Checkpoint
    +   // 
------------------------------------------------------------------------
    +
    +   private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> 
offsetsStateForCheckpoint;
    --- End diff --
    
    "offset" is the Kafka term.
    I would try to rename this to use "sequence number" instead (or a likewise 
abbreviation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to