[ https://issues.apache.org/jira/browse/FLINK-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15446086#comment-15446086 ]
ASF GitHub Bot commented on FLINK-4514: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2432#discussion_r76620194 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -219,19 +228,52 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) subscribedShard.getStreamName(), subscribedShard.getShard().getShardId()); - if (record.isAggregated()) { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())); - } else { - fetcherRef.emitRecordAndUpdateState( - value, - approxArrivalTimestamp, - subscribedShardStateIndex, - new SequenceNumber(record.getSequenceNumber())); + SequenceNumber collectedSequenceNumber = (record.isAggregated()) + ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()) + : new SequenceNumber(record.getSequenceNumber()); + + fetcherRef.emitRecordAndUpdateState( + value, + approxArrivalTimestamp, + subscribedShardStateIndex, + collectedSequenceNumber); + + lastSequenceNum = collectedSequenceNumber; + } + + /** + * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected + * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on + * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should + * be used for the next call to this method. + * + * Note: it is important that this method is not called again before all the records from the last result have been + * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise + * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to + * incorrect shard iteration if the iterator had to be refreshed. + * + * @param shardItr shard iterator to use + * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt + * @return get records result + * @throws InterruptedException + */ + private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { + GetRecordsResult getRecordsResult = null; + while (getRecordsResult == null) { + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); + shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + + // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator + if (fetchIntervalMillis != 0) { + Thread.sleep(fetchIntervalMillis); + } --- End diff -- Sorry for the race commit, didn't realize you was still reviewing. I agree. So, if we're to limit the fetch interval configuration to 5 minutes, then we'll likely infinitely get stuck in this loop, right? I think that was what I had in mind for a more strict 4.5 min, to assure this doesn't happen :P But still, logically, we never know what the `n` will be. > ExpiredIteratorException in Kinesis Consumer on long catch-ups to head of > stream > -------------------------------------------------------------------------------- > > Key: FLINK-4514 > URL: https://issues.apache.org/jira/browse/FLINK-4514 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Affects Versions: 1.1.0, 1.1.1 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0, 1.1.2 > > > Original mailing thread for the reported issue: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-Iterator-expired-exception-td8711.html > Normally, the exception is thrown when the consumer uses the same shard > iterator after 5 minutes since it was retrieved. I've still yet to clarify & > reproduce the root cause of the {{ExpiredIteratorException}}, because from > the code this seems to be impossible. I'm leaning towards suspecting this is > a Kinesis-side issue (from the description in the ML, the behaviour also > seems indeterminate). > Either way, the exception can be fairly easily handled so that the consumer > doesn't just fail. When caught, we request a new shard iterator from Kinesis > with the last sequence number. -- This message was sent by Atlassian JIRA (v6.3.4#6332)