[ 
https://issues.apache.org/jira/browse/FLINK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15446086#comment-15446086
 ] 

ASF GitHub Bot commented on FLINK-4514:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2432#discussion_r76620194
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
    @@ -219,19 +228,52 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
                        subscribedShard.getStreamName(),
                        subscribedShard.getShard().getShardId());
     
    -           if (record.isAggregated()) {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber()));
    -           } else {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber()));
    +           SequenceNumber collectedSequenceNumber = (record.isAggregated())
    +                   ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
    +                   : new SequenceNumber(record.getSequenceNumber());
    +
    +           fetcherRef.emitRecordAndUpdateState(
    +                   value,
    +                   approxArrivalTimestamp,
    +                   subscribedShardStateIndex,
    +                   collectedSequenceNumber);
    +
    +           lastSequenceNum = collectedSequenceNumber;
    +   }
    +
    +   /**
    +    * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
    +    * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
    +    * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
    +    * be used for the next call to this method.
    +    *
    +    * Note: it is important that this method is not called again before 
all the records from the last result have been
    +    * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
    +    * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
    +    * incorrect shard iteration if the iterator had to be refreshed.
    +    *
    +    * @param shardItr shard iterator to use
    +    * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
    +    * @return get records result
    +    * @throws InterruptedException
    +    */
    +   private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
    +           GetRecordsResult getRecordsResult = null;
    +           while (getRecordsResult == null) {
    +                   try {
    +                           getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
    +                   } catch (ExpiredIteratorException eiEx) {
    +                           LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
    +                                   " refreshing the iterator ...", 
shardItr, subscribedShard);
    +                           shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
    +
    +                           // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
    +                           if (fetchIntervalMillis != 0) {
    +                                   Thread.sleep(fetchIntervalMillis);
    +                           }
    --- End diff --
    
    Sorry for the race commit, didn't realize you was still reviewing.
    
    I agree. So, if we're to limit the fetch interval configuration to 5 
minutes, then we'll likely infinitely get stuck in this loop, right? I think 
that was what I had in mind for a more strict 4.5 min, to assure this doesn't 
happen :P But still, logically, we never know what the `n` will be.


> ExpiredIteratorException in Kinesis Consumer on long catch-ups to head of 
> stream
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-4514
>                 URL: https://issues.apache.org/jira/browse/FLINK-4514
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.1.0, 1.1.1
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0, 1.1.2
>
>
> Original mailing thread for the reported issue:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-Iterator-expired-exception-td8711.html
> Normally, the exception is thrown when the consumer uses the same shard 
> iterator after 5 minutes since it was retrieved. I've still yet to clarify & 
> reproduce the root cause of the {{ExpiredIteratorException}}, because from 
> the code this seems to be impossible. I'm leaning towards suspecting this is 
> a Kinesis-side issue (from the description in the ML, the behaviour also 
> seems indeterminate).
> Either way, the exception can be fairly easily handled so that the consumer 
> doesn't just fail. When caught, we request a new shard iterator from Kinesis 
> with the last sequence number.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to