tweise commented on a change in pull request #6408: [FLINK-9897][Kinesis 
Connector] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis
URL: https://github.com/apache/flink/pull/6408#discussion_r205551265
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ##########
 @@ -233,26 +225,69 @@ public void run() {
                                                
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
                                        long recordBatchSizeBytes = 0L;
-                                       long averageRecordSizeBytes = 0L;
-
                                        for (UserRecord record : 
fetchedRecords) {
                                                recordBatchSizeBytes += 
record.getData().remaining();
                                                
deserializeRecordForCollectionAndUpdateState(record);
                                        }
 
-                                       if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-                                               averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-                                               maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-                                       }
-
                                        nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+                                       long processingEndTimeNanos = 
System.nanoTime();
+
+                                       long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+                                       long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+                                       maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes);
+                                       processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
                                }
                        }
                } catch (Throwable t) {
                        fetcherRef.stopWithError(t);
                }
        }
 
+       /**
+        * Adjusts loop timing to match target frequency if specified.
+        * @param processingStartTimeNanos The start time of the run loop "work"
+        * @param processingEndTimeNanos The end time of the run loop "work"
+        * @return The System.nanoTime() after the sleep (if any)
+        * @throws InterruptedException
+        */
+       protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+               throws InterruptedException {
+               long endTimeNanos = processingEndTimeNanos;
+               if (fetchIntervalMillis != 0) {
+                       long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+                       long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+                       if (sleepTimeMillis > 0) {
+                               Thread.sleep(sleepTimeMillis);
+                               endTimeNanos = System.nanoTime();
+                       }
+               }
+               return endTimeNanos;
+       }
+
+       /**
+        * Calculates how many records to read each time through the loop based 
on a target throughput
+        * and the measured frequenecy of the loop.
+        * @param runLoopTimeNanos The total time of one pass through the loop
+        * @param numRecords The number of records of the last read operation
+        * @param recordBatchSizeBytes The total batch size of the last read 
operation
+        */
+       protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
 
 Review comment:
   +1 except that it shouldn't be static so that a subclass can override it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to