Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204996330 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); --- End diff -- just a minor nit pick here: `adjustmentEndTimeNanos` would be better named as `adjustedEndTimeNanos`
---