curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-618879789


   > So I had an offline discussion with Stephan to clarify the scope. Indeed, 
`KafkaShuffle` has been requested by users and serves as a bridge until we get 
fully persistent channels.
   > 
   > We both agree that it would have been nice to also support reading from 
non-Flink shuffles (a.k.a from any partitioned Kafka topic) by making the 
serializer pluggable at the composition level. Please have a look at 
`StreamElementSerializer` and see if we can use it. If that doesn't work for 
some reason, then I can live with a pure `KafkaShuffle` in the first iteration.
   > 
   > Implementation-wise, we are both a bit skeptical that an API change (to 
`SinkFunction`) is the best course as that requires more coordination and 
should have probably been triggered already if you want this feature in 1.11. 
Using custom operators would give you all freedom without that the need of 
coordiation. It would also avoid the changes to `KafkaProducer`/`KafkaConsumer` 
on the cost of replicating some logic.
   > 
   > Lastly, I have strong headaches on how checkpoints and savepoints are 
working with `KafkaShuffle`. I think for storing checkpoints and recovery in 
terms of fault tolerance, the approach is good as-is. However, for savepoints, 
we should probably ensure that no unconsumed data is still in lingering in the 
shuffle topic as that would translate to in-flight data. Hence, restoring from 
an old savepoint would completely screw up the data. At this point, we also 
need to ensure that the topic is purged (probably with some assertion). Not 
supporting going back in checkpoints should be save from current guarantees. 
Alternatively, we also need to implement some recovery logic for older 
check/savepoints that ignores "future" data somehow (so some higher level Kafka 
offset management).
   
   Hey Arvid, thanks so much for the quick response!
   I think you have several concerns about
   
   1. why `StreamElementSerializer` can not be reused
   2. why I have to have a different `KafkaProducer/KafkaConsumer`
   3. Have a better way instead of changing `SinkFunction` (that's exactly my 
concern, and why I want to get early feedback, I am hesitating as well)
   4. `save points`, which I do not completely get it.
   
   For the first two, I have reasons; for the third one, I have concerns as 
well. For the forth, not completely sure I understand it correctly.
   Do, do you have time to chat a bit on these four points on Monday?
   
   BTW, I am not insisting to get this in 1.11 if the scope is bigger than 
expected. Instead, I really want to do it in the right way. 
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to