Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2131#discussion_r68594061
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -86,32 +144,25 @@ public KinesisProxy(Properties configProps) {
         * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
         * @return the batch of retrieved records
         */
    -   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
    +   @Override
    +   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
                final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
                getRecordsRequest.setShardIterator(shardIterator);
                getRecordsRequest.setLimit(maxRecordsToGet);
     
                GetRecordsResult getRecordsResult = null;
     
    -           int remainingRetryTimes = Integer.valueOf(
    -                   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
    -           long describeStreamBackoffTimeInMillis = Long.valueOf(
    -                   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
    -
    -           int i=0;
    -           while (i <= remainingRetryTimes && getRecordsResult == null) {
    +           int attempt = 0;
    +           while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
                        try {
                                getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
                        } catch (ProvisionedThroughputExceededException ex) {
    +                           long backoffMillis = fullJitterBackoff(
    +                                   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
                                LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
    -                                   + describeStreamBackoffTimeInMillis + " 
millis.");
    -                           try {
    -                                   
Thread.sleep(describeStreamBackoffTimeInMillis);
    -                           } catch (InterruptedException interruptEx) {
    -                                   //
    -                           }
    +                                   + backoffMillis + " millis.");
    +                           Thread.sleep(backoffMillis);
                        }
    -                   i++;
                }
     
                if (getRecordsResult == null) {
    --- End diff --
    
    Not sure if throwing an exception here is a good idea.
    From the javadocs of `getRecord()` it says: "Note that if the shard has 
been closed, the shard iterator can't return more data and <a>GetRecords</a> 
returns <code>null</code>". This means that our code would fail each time some 
shards are closed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to