This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 78cd462 [FLINK-19750][connector/kafka] Fix bug of not opening
DeserializationSchema when FlinkKafkaConsumerBase recovers from state (#13785)
78cd462 is described below
commit 78cd4628285c9dfdc0ada3f59cdefc4e8a7eb0bc
Author: Qingsheng Ren <[email protected]>
AuthorDate: Mon Oct 26 12:00:46 2020 +0800
[FLINK-19750][connector/kafka] Fix bug of not opening DeserializationSchema
when FlinkKafkaConsumerBase recovers from state (#13785)
---
.../streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 4 ++--
.../connectors/kafka/FlinkKafkaConsumerBaseTest.java | 12 ++++++++++++
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 67d65fb..7cc364e 100644
---
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -690,9 +690,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
LOG.info("Consumer subtask {} initially has no
partitions to read from.",
getRuntimeContext().getIndexOfThisSubtask());
}
-
- this.deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("user"));
}
+
+ this.deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("user"));
}
@Override
diff --git
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 2501109..fc3f288 100644
---
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -852,6 +852,18 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger
{
assertThat("Open method was not called",
deserializationSchema.isOpenCalled(), is(true));
}
+ @Test
+ public void testOpenWithRestoreState() throws Exception {
+ MockDeserializationSchema<String> deserializationSchema = new
MockDeserializationSchema<>();
+ final FlinkKafkaConsumerBase<String> consumer = new
DummyFlinkKafkaConsumer<>(
+ new
KafkaDeserializationSchemaWrapper<>(deserializationSchema));
+
+ final TestingListState<Tuple2<KafkaTopicPartition, Long>>
restoredListState = new TestingListState<>();
+ setupConsumer(consumer, true, restoredListState, true, 0, 1);
+
+ assertThat("DeserializationSchema's open method was not
invoked", deserializationSchema.isOpenCalled(), is(true));
+ }
+
//
------------------------------------------------------------------------
private static <T> AbstractStreamOperatorTestHarness<T>
createTestHarness(