This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new c905a4d [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout c905a4d is described below commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0 Author: Thomas Weise <t...@apache.org> AuthorDate: Mon Aug 26 15:02:40 2019 -0700 [hotfix][kinesis] Update emit record javadoc and don't count max watermark as timeout --- .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java | 5 +++-- .../connectors/kinesis/util/JobManagerWatermarkTracker.java | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index f38e6eb..80b724b 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -715,7 +715,7 @@ public class KinesisDataFetcher<T> { // ------------------------------------------------------------------------ /** - * Atomic operation to collect a record and update state to the sequence number of the record. + * Prepare a record and hand it over to the {@link RecordEmitter}, which may collect it asynchronously. * This method is called by {@link ShardConsumer}s. * * @param record the record to collect @@ -752,7 +752,8 @@ public class KinesisDataFetcher<T> { } /** - * Actual record emission called from the record emitter. + * Atomic operation to collect a record and update state to the sequence number of the record. + * This method is called from the record emitter. * * <p>Responsible for tracking per shard watermarks and emit timestamps extracted from * the record, when a watermark assigner was configured. diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java index f150bb0..1581024 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java @@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends WatermarkTracker { WatermarkState ws = e.getValue(); if (ws.lastUpdated + updateTimeoutMillis < currentTime) { // ignore outdated entry - updateTimeoutCount++; + if (ws.watermark < Long.MAX_VALUE) { + updateTimeoutCount++; + } continue; } globalWatermark = Math.min(ws.watermark, globalWatermark);