The problem here is that context.isRestored() is a global flag and not local to 
each operator. It says "yes this job was restored" but the source would need to 
know that it is actually brand new and never had any state. This is quite 
tricky to do, since there is currently no way (if I'm correct) to differentiate 
between "I got empty state but others maybe got state" and "this source never 
had state and neither had other parallel instances".

Best,
Aljoscha

> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com> wrote:
> 
> Thanks for the report, I will take a look.
> 
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org 
>> <mailto:gyf...@apache.org>>:
>> 
>> Hi all,
>> 
>> We are running into some problems with the kafka source after changing the 
>> uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all 
>> over again, but what seems to happen is that the consumer thinks that it 
>> doesnt have any partitions assigned.
>> 
>> This was supposed to be fixed in 
>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>  
>> <https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475>
>> but appears to be reworked/reverted in the latest release : 
>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>  
>> <https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547>
>> 
>> What is the expected behaviour here?
>> 
>> Thanks!
>> Gyula
> 

Reply via email to