This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 78cd462  [FLINK-19750][connector/kafka] Fix bug of not opening 
DeserializationSchema when FlinkKafkaConsumerBase recovers from state (#13785)
78cd462 is described below

commit 78cd4628285c9dfdc0ada3f59cdefc4e8a7eb0bc
Author: Qingsheng Ren <[email protected]>
AuthorDate: Mon Oct 26 12:00:46 2020 +0800

    [FLINK-19750][connector/kafka] Fix bug of not opening DeserializationSchema 
when FlinkKafkaConsumerBase recovers from state (#13785)
---
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java   |  4 ++--
 .../connectors/kafka/FlinkKafkaConsumerBaseTest.java         | 12 ++++++++++++
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 67d65fb..7cc364e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -690,9 +690,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                LOG.info("Consumer subtask {} initially has no 
partitions to read from.",
                                        
getRuntimeContext().getIndexOfThisSubtask());
                        }
-
-                       this.deserializer.open(() -> 
getRuntimeContext().getMetricGroup().addGroup("user"));
                }
+
+               this.deserializer.open(() -> 
getRuntimeContext().getMetricGroup().addGroup("user"));
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 2501109..fc3f288 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -852,6 +852,18 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger 
{
                assertThat("Open method was not called", 
deserializationSchema.isOpenCalled(), is(true));
        }
 
+       @Test
+       public void testOpenWithRestoreState() throws Exception {
+               MockDeserializationSchema<String> deserializationSchema = new 
MockDeserializationSchema<>();
+               final FlinkKafkaConsumerBase<String> consumer = new 
DummyFlinkKafkaConsumer<>(
+                               new 
KafkaDeserializationSchemaWrapper<>(deserializationSchema));
+
+               final TestingListState<Tuple2<KafkaTopicPartition, Long>> 
restoredListState = new TestingListState<>();
+               setupConsumer(consumer, true, restoredListState, true, 0, 1);
+
+               assertThat("DeserializationSchema's open method was not 
invoked", deserializationSchema.isOpenCalled(), is(true));
+       }
+
        // 
------------------------------------------------------------------------
 
        private static <T> AbstractStreamOperatorTestHarness<T> 
createTestHarness(

Reply via email to