[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748589#comment-15748589 ]
ASF GitHub Bot commented on FLINK-4574: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/2925 I'm definitely planning to look at this over the next few days :) Currently quite overwhelmed right now. Thanks for all your recent work on the Kinesis connector @tony810430, and very sorry for the late reviews. Please bear with me for a little while, I'll get back to the PRs soon ;) > Strengthen fetch interval implementation in Kinesis consumer > ------------------------------------------------------------ > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v6.3.4#6332)