Hey, I know it's old discussion but there also seems to be a problem with the logic in the kafka source alone regarding new topics added after a checkpoint.
Maybe there is a ticket for this already and I just missed it. Cheers, Gyula Gyula Fóra <gyf...@apache.org> ezt írta (időpont: 2017. szept. 14., Cs, 14:53): > Good job for figuring this out! > This certainly seems to explain our problems. > > Thanks! > Gyula > > Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept. > 14., Cs, 14:46): > >> After a bit more digging I found that the "isRestored" flag doesn't work >> correctly if there are operators chained to the sink that have state: >> https://issues.apache.org/jira/browse/FLINK-7623 >> >> Blocker issue for 1.3.3 and 1.4.0. >> >> Best, >> Aljoscha >> >> On 6. Sep 2017, at 16:05, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> After discussing this between Stefan and me we think that this should >> actually work. >> >> Do you have the log output from restoring the Kafka Consumer? It would be >> interesting to see whether any of those print: >> - >> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611 >> - >> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554 >> >> On 6. Sep 2017, at 14:45, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer >> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we >> store state in a union state, i.e. all sources get all partition on restore >> and if they didn't get any they know that they are new. There is no >> specific logic for detecting this situation, it's just that the partition >> discoverer will be seeded with this information and it will know if it >> discovers a new partition whether it can take ownership of that partition. >> >> I'm sure Gordon (cc'ed) could explain it better than I did. >> >> On 6. Sep 2017, at 14:36, Gyula Fóra <gyf...@apache.org> wrote: >> >> Wouldnt it be enough that Kafka sources store some empty container for >> there state if it is empty, compared to null when it should be bootstrapped >> again? >> >> Gyula >> >> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept. >> 6., Sze, 14:31): >> >>> The problem here is that context.isRestored() is a global flag and not >>> local to each operator. It says "yes this job was restored" but the source >>> would need to know that it is actually brand new and never had any state. >>> This is quite tricky to do, since there is currently no way (if I'm >>> correct) to differentiate between "I got empty state but others maybe got >>> state" and "this source never had state and neither had other parallel >>> instances". >>> >>> Best, >>> Aljoscha >>> >>> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com> >>> wrote: >>> >>> Thanks for the report, I will take a look. >>> >>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org>: >>> >>> Hi all, >>> >>> We are running into some problems with the kafka source after changing >>> the uid and restoring from the savepoint. >>> What we are expecting is to clear the partition state, and set it up all >>> over again, but what seems to happen is that the consumer thinks that it >>> doesnt have any partitions assigned. >>> >>> This was supposed to be fixed in >>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475 >>> but appears to be reworked/reverted in the latest release : >>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547 >>> >>> What is the expected behaviour here? >>> >>> Thanks! >>> Gyula >>> >>> >>> >>> >> >> >>