Arnaud Linz created FLINK-16509:
-----------------------------------
Summary: FlinkKafkaConsumerBase tries to log a context that may
not have been initialized and fails
Key: FLINK-16509
URL: https://issues.apache.org/jira/browse/FLINK-16509
Project: Flink
Issue Type: Bug
Affects Versions: 1.10.0
Environment: Unit test on local cluster, calling a unit test local
kafka server.
Reporter: Arnaud Linz
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
```
LOG.info("Consumer subtask {} restored state: {}.",
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{
LOG.info("Consumer subtask {} has no restore state.",
getRuntimeContext().getIndexOfThisSubtask()); }
```
where as old (1.8.0) class was logging without calling getRuntimeContext :
```
LOG.info("Setting restore state in the FlinkKafkaConsumer: {}",
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer.");
}
```
This causes a regression in my Kafka source unit test with exception:
```
java.lang.IllegalStateException: The runtime context has not been initialized.
at
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
```
As the context is not always available at that point (initalizeState being
called before open I guess)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)