Still not nice, though, and it took a while to finalise discovery for 1.4. ;-)
If you need that now you might be able to back port the 1.4 consumer to 1.3. > On 12. Oct 2017, at 17:05, Gyula Fóra <gyf...@apache.org> wrote: > > Ok, thanks for the clarification. :) > > Gyula > > > On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > It might be old but it's not forgotten, the issue I created is actually > marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0. > > The issue in Kafka is about new topics/partitions not being discovered or > something else? That would be the expected behaviour in Flink < 1.4.0. > > Best, > Aljoscha > >> On 12. Oct 2017, at 16:40, Gyula Fóra <gyf...@apache.org >> <mailto:gyf...@apache.org>> wrote: >> >> Hey, >> >> I know it's old discussion but there also seems to be a problem with the >> logic in the kafka source alone regarding new topics added after a >> checkpoint. >> >> Maybe there is a ticket for this already and I just missed it. >> >> Cheers, >> Gyula >> >> Gyula Fóra <gyf...@apache.org <mailto:gyf...@apache.org>> ezt írta (időpont: >> 2017. szept. 14., Cs, 14:53): >> Good job for figuring this out! >> This certainly seems to explain our problems. >> >> Thanks! >> Gyula >> >> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt írta >> (időpont: 2017. szept. 14., Cs, 14:46): >> After a bit more digging I found that the "isRestored" flag doesn't work >> correctly if there are operators chained to the sink that have state: >> https://issues.apache.org/jira/browse/FLINK-7623 >> <https://issues.apache.org/jira/browse/FLINK-7623> >> >> Blocker issue for 1.3.3 and 1.4.0. >> >> Best, >> Aljoscha >> >>> On 6. Sep 2017, at 16:05, Aljoscha Krettek <aljos...@apache.org >>> <mailto:aljos...@apache.org>> wrote: >>> >>> After discussing this between Stefan and me we think that this should >>> actually work. >>> >>> Do you have the log output from restoring the Kafka Consumer? It would be >>> interesting to see whether any of those print: >>> - >>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611 >>> >>> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611> >>> - >>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554 >>> >>> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554> >>> >>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek <aljos...@apache.org >>>> <mailto:aljos...@apache.org>> wrote: >>>> >>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer >>>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we >>>> store state in a union state, i.e. all sources get all partition on >>>> restore and if they didn't get any they know that they are new. There is >>>> no specific logic for detecting this situation, it's just that the >>>> partition discoverer will be seeded with this information and it will know >>>> if it discovers a new partition whether it can take ownership of that >>>> partition. >>>> >>>> I'm sure Gordon (cc'ed) could explain it better than I did. >>>> >>>>> On 6. Sep 2017, at 14:36, Gyula Fóra <gyf...@apache.org >>>>> <mailto:gyf...@apache.org>> wrote: >>>>> >>>>> Wouldnt it be enough that Kafka sources store some empty container for >>>>> there state if it is empty, compared to null when it should be >>>>> bootstrapped again? >>>>> >>>>> Gyula >>>>> >>>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt >>>>> írta (időpont: 2017. szept. 6., Sze, 14:31): >>>>> The problem here is that context.isRestored() is a global flag and not >>>>> local to each operator. It says "yes this job was restored" but the >>>>> source would need to know that it is actually brand new and never had any >>>>> state. This is quite tricky to do, since there is currently no way (if >>>>> I'm correct) to differentiate between "I got empty state but others maybe >>>>> got state" and "this source never had state and neither had other >>>>> parallel instances". >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>>> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com >>>>>> <mailto:s.rich...@data-artisans.com>> wrote: >>>>>> >>>>>> Thanks for the report, I will take a look. >>>>>> >>>>>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org >>>>>>> <mailto:gyf...@apache.org>>: >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> We are running into some problems with the kafka source after changing >>>>>>> the uid and restoring from the savepoint. >>>>>>> What we are expecting is to clear the partition state, and set it up >>>>>>> all over again, but what seems to happen is that the consumer thinks >>>>>>> that it doesnt have any partitions assigned. >>>>>>> >>>>>>> This was supposed to be fixed in >>>>>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475 >>>>>>> >>>>>>> <https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475> >>>>>>> but appears to be reworked/reverted in the latest release : >>>>>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547 >>>>>>> >>>>>>> <https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547> >>>>>>> >>>>>>> What is the expected behaviour here? >>>>>>> >>>>>>> Thanks! >>>>>>> Gyula >>>>>> >>>>> >>>> >>> >> >