Wouldnt it be enough that Kafka sources store some empty container for
there state if it is empty, compared to null when it should be bootstrapped
again?

Gyula

Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept. 6.,
Sze, 14:31):

> The problem here is that context.isRestored() is a global flag and not
> local to each operator. It says "yes this job was restored" but the source
> would need to know that it is actually brand new and never had any state.
> This is quite tricky to do, since there is currently no way (if I'm
> correct) to differentiate between "I got empty state but others maybe got
> state" and "this source never had state and neither had other parallel
> instances".
>
> Best,
> Aljoscha
>
> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com>
> wrote:
>
> Thanks for the report, I will take a look.
>
> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org>:
>
> Hi all,
>
> We are running into some problems with the kafka source after changing the
> uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all
> over again, but what seems to happen is that the consumer thinks that it
> doesnt have any partitions assigned.
>
> This was supposed to be fixed in
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
> but appears to be reworked/reverted in the latest release :
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>
> What is the expected behaviour here?
>
> Thanks!
> Gyula
>
>
>
>

Reply via email to