[ https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555232#comment-16555232 ]
ASF GitHub Bot commented on FLINK-9897: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204998677 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** + * Adjusts loop timing to match target frequency if specified. + * @param processingStartTimeNanos The start time of the run loop "work" + * @param processingEndTimeNanos The end time of the run loop "work" + * @return The System.nanoTime() after the sleep (if any) + * @throws InterruptedException + */ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** + * Calculates how many records to read each time through the loop based on a target throughput + * and the measured frequenecy of the loop. + * @param runLoopTimeNanos The total time of one pass through the loop + * @param numRecords The number of records of the last read operation + * @param recordBatchSizeBytes The total batch size of the last read operation + */ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { --- End diff -- If this is a method without side effects on the fields of the `ShardConsumer`, it might be better off to make this method static, and pass in the current `maxNumberOfRecordsPerFetch` as an argument. This makes it more clear that it is only an utility method to calculate the number of records to read. > Further enhance adaptive reads in Kinesis Connector to depend on run loop time > ------------------------------------------------------------------------------ > > Key: FLINK-9897 > URL: https://issues.apache.org/jira/browse/FLINK-9897 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.4.2, 1.5.1 > Reporter: Lakshmi Rao > Assignee: Lakshmi Rao > Priority: Major > Labels: pull-request-available > > In FLINK-9692, we introduced the ability for the shardConsumer to adaptively > read more records based on the current average record size to optimize the 2 > Mb/sec shard limit. The feature maximizes maxNumberOfRecordsPerFetch of 5 > reads/sec (as prescribed by Kinesis limits). In the case where applications > take more time to process records in the run loop, they are no longer able to > read at a frequency of 5 reads/sec (even though their fetchIntervalMillis > maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch > should be calculated based on the time that the run loop actually takes as > opposed to fetchIntervalMillis. -- This message was sent by Atlassian JIRA (v7.6.3#76005)