[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16562695#comment-16562695
 ] 

ASF GitHub Bot commented on FLINK-9897:
---------------------------------------

glaksh100 commented on a change in pull request #6408: [FLINK-9897][Kinesis 
Connector] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis
URL: https://github.com/apache/flink/pull/6408#discussion_r206362309
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -233,26 +225,69 @@ public void run() {
                                                
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
                                        long recordBatchSizeBytes = 0L;
-                                       long averageRecordSizeBytes = 0L;
-
                                        for (UserRecord record : 
fetchedRecords) {
                                                recordBatchSizeBytes += 
record.getData().remaining();
                                                
deserializeRecordForCollectionAndUpdateState(record);
                                        }
 
-                                       if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-                                               averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-                                               maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-                                       }
-
                                        nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+                                       long processingEndTimeNanos = 
System.nanoTime();
+
+                                       long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
 
 Review comment:
   Changed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-9897
>                 URL: https://issues.apache.org/jira/browse/FLINK-9897
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.4.2, 1.5.1
>            Reporter: Lakshmi Rao
>            Assignee: Lakshmi Rao
>            Priority: Major
>              Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to