This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c905a4d  [hotfix][kinesis] Update emit record javadoc and don't count 
max watermark as timeout
c905a4d is described below

commit c905a4d323c0ed4985cdb9e5764efe13bc6183d0
Author: Thomas Weise <t...@apache.org>
AuthorDate: Mon Aug 26 15:02:40 2019 -0700

    [hotfix][kinesis] Update emit record javadoc and don't count max watermark 
as timeout
---
 .../streaming/connectors/kinesis/internals/KinesisDataFetcher.java   | 5 +++--
 .../connectors/kinesis/util/JobManagerWatermarkTracker.java          | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index f38e6eb..80b724b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -715,7 +715,7 @@ public class KinesisDataFetcher<T> {
        // 
------------------------------------------------------------------------
 
        /**
-        * Atomic operation to collect a record and update state to the 
sequence number of the record.
+        * Prepare a record and hand it over to the {@link RecordEmitter}, 
which may collect it asynchronously.
         * This method is called by {@link ShardConsumer}s.
         *
         * @param record the record to collect
@@ -752,7 +752,8 @@ public class KinesisDataFetcher<T> {
        }
 
        /**
-        * Actual record emission called from the record emitter.
+        * Atomic operation to collect a record and update state to the 
sequence number of the record.
+        * This method is called from the record emitter.
         *
         * <p>Responsible for tracking per shard watermarks and emit timestamps 
extracted from
         * the record, when a watermark assigner was configured.
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
index f150bb0..1581024 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
@@ -144,7 +144,9 @@ public class JobManagerWatermarkTracker extends 
WatermarkTracker {
                                WatermarkState ws = e.getValue();
                                if (ws.lastUpdated + updateTimeoutMillis < 
currentTime) {
                                        // ignore outdated entry
-                                       updateTimeoutCount++;
+                                       if (ws.watermark < Long.MAX_VALUE) {
+                                               updateTimeoutCount++;
+                                       }
                                        continue;
                                }
                                globalWatermark = Math.min(ws.watermark, 
globalWatermark);

Reply via email to