curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
   ####  1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.
    
   Please let me know what do you think of this proposal.
   
   #### 2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
   #### 3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to