Still not nice, though, and it took a while to finalise discovery for 1.4. ;-)

If you need that now you might be able to back port the 1.4 consumer to 1.3.

> On 12. Oct 2017, at 17:05, Gyula Fóra <gyf...@apache.org> wrote:
> 
> Ok, thanks for the clarification. :)
> 
> Gyula
> 
> 
> On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> It might be old but it's not forgotten, the issue I created is actually 
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
> 
> The issue in Kafka is about new topics/partitions not being discovered or 
> something else? That would be the expected behaviour in Flink < 1.4.0.
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 16:40, Gyula Fóra <gyf...@apache.org 
>> <mailto:gyf...@apache.org>> wrote:
>> 
>> Hey,
>> 
>> I know it's old discussion but there also seems to be a problem with the 
>> logic in the kafka source alone regarding new topics added after a 
>> checkpoint. 
>> 
>> Maybe there is a ticket for this already and I just missed it.
>> 
>> Cheers,
>> Gyula
>> 
>> Gyula Fóra <gyf...@apache.org <mailto:gyf...@apache.org>> ezt írta (időpont: 
>> 2017. szept. 14., Cs, 14:53):
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>> 
>> Thanks!
>> Gyula
>> 
>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt írta 
>> (időpont: 2017. szept. 14., Cs, 14:46):
>> After a bit more digging I found that the "isRestored" flag doesn't work 
>> correctly if there are operators chained to the sink that have state: 
>> https://issues.apache.org/jira/browse/FLINK-7623 
>> <https://issues.apache.org/jira/browse/FLINK-7623>
>> 
>> Blocker issue for 1.3.3 and 1.4.0.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek <aljos...@apache.org 
>>> <mailto:aljos...@apache.org>> wrote:
>>> 
>>> After discussing this between Stefan and me we think that this should 
>>> actually work.
>>> 
>>> Do you have the log output from restoring the Kafka Consumer? It would be 
>>> interesting to see whether any of those print:
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  
>>> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611>
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>  
>>> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554>
>>> 
>>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek <aljos...@apache.org 
>>>> <mailto:aljos...@apache.org>> wrote:
>>>> 
>>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
>>>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
>>>> store state in a union state, i.e. all sources get all partition on 
>>>> restore and if they didn't get any they know that they are new. There is 
>>>> no specific logic for detecting this situation, it's just that the 
>>>> partition discoverer will be seeded with this information and it will know 
>>>> if it discovers a new partition whether it can take ownership of that 
>>>> partition.
>>>> 
>>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>>> 
>>>>> On 6. Sep 2017, at 14:36, Gyula Fóra <gyf...@apache.org 
>>>>> <mailto:gyf...@apache.org>> wrote:
>>>>> 
>>>>> Wouldnt it be enough that Kafka sources store some empty container for 
>>>>> there state if it is empty, compared to null when it should be 
>>>>> bootstrapped again?
>>>>> 
>>>>> Gyula
>>>>> 
>>>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt 
>>>>> írta (időpont: 2017. szept. 6., Sze, 14:31):
>>>>> The problem here is that context.isRestored() is a global flag and not 
>>>>> local to each operator. It says "yes this job was restored" but the 
>>>>> source would need to know that it is actually brand new and never had any 
>>>>> state. This is quite tricky to do, since there is currently no way (if 
>>>>> I'm correct) to differentiate between "I got empty state but others maybe 
>>>>> got state" and "this source never had state and neither had other 
>>>>> parallel instances".
>>>>> 
>>>>> Best,
>>>>> Aljoscha
>>>>> 
>>>>>> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com 
>>>>>> <mailto:s.rich...@data-artisans.com>> wrote:
>>>>>> 
>>>>>> Thanks for the report, I will take a look.
>>>>>> 
>>>>>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org 
>>>>>>> <mailto:gyf...@apache.org>>:
>>>>>>> 
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> We are running into some problems with the kafka source after changing 
>>>>>>> the uid and restoring from the savepoint.
>>>>>>> What we are expecting is to clear the partition state, and set it up 
>>>>>>> all over again, but what seems to happen is that the consumer thinks 
>>>>>>> that it doesnt have any partitions assigned.
>>>>>>> 
>>>>>>> This was supposed to be fixed in 
>>>>>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>>>>>>  
>>>>>>> <https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475>
>>>>>>> but appears to be reworked/reverted in the latest release : 
>>>>>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>>>>>  
>>>>>>> <https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547>
>>>>>>> 
>>>>>>> What is the expected behaviour here?
>>>>>>> 
>>>>>>> Thanks!
>>>>>>> Gyula
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to