[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204942574 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,68 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { + if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) { + long averageRecordSizeBytes = recordBatchSizeBytes / numRecords; + // Adjust number of records to fetch from the shard depending on current average record size + // to optimize 2 Mb / sec read limits + double loopFrequencyHz = 10.0d / runLoopTimeNanos; + double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; + maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); + // Ensure the value is not more than 1L + maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + } + return maxNumberOfRecordsPerFetch; --- End diff -- Oops, thanks for catching. Updated to use the return value and also to use a local variable in the
[GitHub] flink pull request #6409: Flink 9899.kinesis connector metrics
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/6409 Flink 9899.kinesis connector metrics ## What is the purpose of the change The purpose of this change is to add metrics to the `ShardConsumer` to get more observability into the performance of the Kinesis connector, including the enhancements introduced in [FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . **Important** - https://github.com/apache/flink/pull/6408 has to be merged **before** taking out this change. ## Brief change log All metrics are added as gauges. The following per-shard metrics are added. : - sleepTimeMillis - maxNumberOfRecordsPerFetch - numberOfAggregatedRecordsPerFetch - numberOfDeaggregatedRecordsPerFetch - bytesRequestedPerFetch - averageRecordSizeBytes - runLoopTimeNanos - loopFrequencyHz ## Verifying this change This change is already covered by existing tests, such as: `ShardConsumerTest`, `KinesisDataFetcherTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6409 commit f333781a7c4f1a10b6120a962ff211e023bafaab Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis Remove unused method commit f51703177df9afcdba3778909b1e9d8b7fa4bf46 Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis commit d493097d09c6223383282ed90648853715b197ce Author: Lakshmi Gururaja Rao Date: 2018-07-24T21:13:53Z [FLINK-9899] Add more ShardConsumer metrics Checkstyle fix ---
[GitHub] flink pull request #6408: [FLINK-9897] Make adaptive reads depend on run loo...
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/6408 [FLINK-9897] Make adaptive reads depend on run loop time instead of fetchintervalmillis ## What is the purpose of the change [FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` reads/second. However, in the case that the run loop of the `ShardConsumer` takes more than `fetchIntervalMillis` to process records, the `maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change is to make the adaptive reads more efficient by using the actual run loop frequency to determine the number of reads/second and `maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be more modular. ## Brief change log - `processingStartTimeNanos` records start time of loop - `processingEndTimeNanos` records end time of loop - `adjustRunLoopFrequency()` adjusts end time depending on `sleepTimeMillis` (if any). - `runLoopTimeNanos` records actual run loop time. - `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on `runLoopTimeNanos` - Unused method `getAdaptiveMaxRecordsPerFetch` is removed. ## Verifying this change This change is already covered by existing tests, such as `ShardConsumerTest` This has also been tested against a stream with the following configuration ``` Number of Shards: 512 Parallelism: 128 ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6408.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6408 commit 786556b9a9a509051a14772fbbd282db73e65252 Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 @StephanEwen I have run this on the following set up: ``` Number of shards on Kinesis stream: 384 Number of task slots: 384 / 192 / 96 Throughput achieved per shard (with adaptive reads) : 1.95 Mb/sec / 1.75 Mb/sec / 1.6 Mb/sec ``` ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 The idea here is that `maxNumberOfRecordsPerFetch` should never be a value that gets records that exceeds the read limit (2 Mb/sec) from the math here. ``` 2 Mb/sec / (averageRecordSizeBytes * # reads/sec)) ``` Atleast that's what the intent is - Let me know if that makes sense or if there is something amiss about the approach here. If there is a way in which `maxNumberOfRecordsPerFetch` is set such that the limit is exceeded, then yes, it will still be throttled by Kinesis. ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227845 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 1L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- Changed. ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227834 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -134,6 +134,10 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval between each attempt to discover new shards. */ public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + /** The config to turn on adaptive reads from a shard. */ + public static final String SHARD_USE_ADAPTIVE_READS = "flink.shard.use.adaptive.reads"; --- End diff -- Changed. ---
[GitHub] flink pull request #6300: [FLINK-9692][Kinesis Connector] Adaptive reads fro...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202156901 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List deaggregateRecords(List records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** +* Adapts the maxNumberOfRecordsPerFetch based on the current average record size +* to optimize 2 Mb / sec read limits. +* +* @param averageRecordSizeBytes +* @return adaptedMaxRecordsPerFetch +*/ + + private int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { --- End diff -- Makes sense. Done. ---
[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/6300 @fhueske @tzulitai @tweise Can you please take a look when you have a chance? ---
[GitHub] flink pull request #6300: [FLINK-9692] Adaptive reads from Kinesis
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/6300 [FLINK-9692] Adaptive reads from Kinesis ## What is the purpose of the change The purpose of this change is to provide an option to the Kinesis connector to optimize the amount of data (in bytes) read from Kinesis. The Kinesis connector currently has a [constant value](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java#L213) set for `maxNumberOfRecordsPerFetch` that it can fetch from a single Kinesis `getRecords` call. However, in most realtime scenarios, the average size of the Kinesis record (in bytes) is not constant. The idea here is to adapt the Kinesis connector to identify an average batch size prior to making the `getRecords` call, so that the `maxNumberOfRecordsPerFetch` parameter can be tuned to be as high as possible without exceeding the 2 Mb/sec [per shard limit](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). This feature can be set using a [ConsumerConfigConstants](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java) flag that defaults to false. ## Brief change log - With an initial value for `maxNumberofRecordsPerFetch`, the average size of a record returned in the batch of records is calculated - `maxNumberofRecordsPerFetch` is then set to ` 2 Mbps/ (average size of record/fetchIntervalMillis)` to maximize throughput in each `getRecords` call - This feature is turned on/off using a boolean in `ConsumerConfigConstants` - `SHARD_USE_ADAPTIVE_READS` - `DEFAULT_SHARD_USE_ADAPTIVE_READS` is set to `false` ## Verifying this change This change added tests and can be verified as follows: - Added a `testCorrectNumOfCollectedRecordsAndUpdatedStateWithAdaptiveReads` test method to `ShardConsumerTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/glaksh100/flink FLINK-9692.adaptiveKinesisReads Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6300.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6300 commit 0c29017d6d1e98359d3093aaaecc54338324e57e Author: Lakshmi Gururaja Rao Date: 2018-07-10T18:40:02Z [FLINK-9692] Adaptive reads from Kinesis ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 @fhueske I gave it some thought and your suggestion makes sense to me. I have extended `checkForInactiveBuckets` to include the rollover check. I have also updated Javadocs in a few places: - Added a note in the top-level Javadocs to update functionality of `checkForInactiveBuckets()` - Updated JavaDocs for both `setBatchRolloverInterval()` and `setInactiveBucketThreshold()` - Updated JavaDoc for `checkForInactiveBuckets()` Let me know if the updates make sense and thank you for reviewing! ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 Thanks for reviewing @fhueske @aljoscha and @kl0u ! I have addressed the latest review comments. Can you PTAL (again) ? ---
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223237 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -473,6 +482,15 @@ private boolean shouldRoll(BucketState bucketState) throws IOException { subtaskIndex, writePosition, batchSize); + } else { + long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); --- End diff -- Updated method signature for `shouldRoll` to include the `currentProcessingTime` ---
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/5860#discussion_r186223099 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -908,6 +929,20 @@ private void handlePendingFilesForPreviousCheckpoints(Map> pe return this; } + /** +* Sets the roll over interval in milliseconds. +* +* +* When a bucket part file is older than the roll over interval, a new bucket part file is +* started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}. +* +* @param batchRolloverInterval The roll over interval in milliseconds +*/ + public BucketingSink setBatchRolloverInterval(long batchRolloverInterval) { + this.batchRolloverInterval = batchRolloverInterval; + return this; --- End diff -- Added a check for `batchRolloverInterval` to be a positive non-zero value. ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 @fhueske Can you PTAL and merge this PR? ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 @fhueske Thank you for reviewing. I have incorporated the changes that include update to the [documentation](https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/filesystem_sink.html) on the website. Travis seems to be failing on a test case that seems unrelated - ```java.lang.AssertionError: This program execution should have failed. at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.test.misc.SuccessAfterNetworkBuffersFailureITCase.testSuccessfulProgramAfterFailure(SuccessAfterNetworkBuffersFailureITCase.java:75)``` ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user glaksh100 commented on the issue: https://github.com/apache/flink/pull/5860 @aljoscha @fhueske Can you please take a look? ---
[GitHub] flink pull request #5860: [FLINK-9138][filesystem-connectors] Implement time...
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/5860 [FLINK-9138][filesystem-connectors] Implement time based rollover in BucketingSink ## What is the purpose of the change This pull request enables a time-based rollover of the part file in the BucketingSink. This is particularly applicable when when write throughput is low and helps data become available at a fixed interval, for consumption. ## Brief change log - Add a `batchRolloverInterval` field with a setter - Track a `firstWrittenToTime` for the bucket state - Check for `currentProcessingTime` - `firstWrittenToTime` > `batchRolloverInterval` and roll over if true ## Verifying this change This change added tests and can be verified as follows: - Added a `testRolloverInterval` test method to the `BucketingSinkTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/glaksh100/flink FLINK-9138.bucketingSinkRolloverInterval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5860.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5860 commit fee3ba293f4db4ad2d39b4ac0f3993711da9bda6 Author: Lakshmi Gururaja Rao Date: 2018-04-16T23:31:49Z [FLINK-9138] Implement time based rollover of part file in BucketingSink ---