Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6408#discussion_r204996330
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
    @@ -233,26 +225,69 @@ public void run() {
                                                
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
     
                                        long recordBatchSizeBytes = 0L;
    -                                   long averageRecordSizeBytes = 0L;
    -
                                        for (UserRecord record : 
fetchedRecords) {
                                                recordBatchSizeBytes += 
record.getData().remaining();
                                                
deserializeRecordForCollectionAndUpdateState(record);
                                        }
     
    -                                   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
    -                                           averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
    -                                           maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
    -                                   }
    -
                                        nextShardItr = 
getRecordsResult.getNextShardIterator();
    +
    +                                   long processingEndTimeNanos = 
System.nanoTime();
    +
    +                                   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
    --- End diff --
    
    just a minor nit pick here: `adjustmentEndTimeNanos` would be better named 
as `adjustedEndTimeNanos`


---

Reply via email to