This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a475f571777017805bac8872f19ebe8160ad4cd6
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 9 21:06:42 2020 +0100

    [FLINK-20051][connector kafka] Ensure KafkaPartitionSplitRecords returned 
on consumer wakeup is properly initializes
---
 .../flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index 3313a2a..c120055 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -96,6 +96,7 @@ public class KafkaPartitionSplitReader<T> implements 
SplitReader<Tuple3<T, Long,
                try {
                        consumerRecords = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
                } catch (WakeupException we) {
+                       recordsBySplits.prepareForRead();
                        return recordsBySplits;
                }
 

Reply via email to