Wouldnt it be enough that Kafka sources store some empty container for there state if it is empty, compared to null when it should be bootstrapped again?
Gyula Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept. 6., Sze, 14:31): > The problem here is that context.isRestored() is a global flag and not > local to each operator. It says "yes this job was restored" but the source > would need to know that it is actually brand new and never had any state. > This is quite tricky to do, since there is currently no way (if I'm > correct) to differentiate between "I got empty state but others maybe got > state" and "this source never had state and neither had other parallel > instances". > > Best, > Aljoscha > > On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com> > wrote: > > Thanks for the report, I will take a look. > > Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org>: > > Hi all, > > We are running into some problems with the kafka source after changing the > uid and restoring from the savepoint. > What we are expecting is to clear the partition state, and set it up all > over again, but what seems to happen is that the consumer thinks that it > doesnt have any partitions assigned. > > This was supposed to be fixed in > https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475 > but appears to be reworked/reverted in the latest release : > https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547 > > What is the expected behaviour here? > > Thanks! > Gyula > > > >