curcur edited a comment on pull request #11725: URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591
### 1. Change to `SinkFunction`. By "coordination", do you mean that other changes are also made upon `SinkFunction` and may need coordination? I've expected reviewers to have strong reactions to the API change, that's fine. But I am a bit confused about what is agreed/disagreed and what is a suggested better way, so let me try to clarify some of my thoughts and reason about why the API is changed in this way. As suggested by Stephan, In the PR, I do have a custom operator `StreamShuffleSink` and a custom transformation in `SinkTransformation` for the custom operator. As Arvid mentioned in the previous reviews, there are a lot of code duplications between `StreamShuffleSink` and `StreamSink`. - That's true because they are very similar LOL, but we do want to provide a different operator to minimize the impact of changes on existing operators. - Personally, I do not prefer to have multi-levels of extends/subclasses, especially if the superclass is not abstract. Multi-level extensions make code very difficult to read. You can not easily track what functions/members a class contains in a straightforward way, especially without a good IDE. - Come back to the duplication. There are in total 100 lines of code, with very simple logic. So personally I would prefer to trade these `100` lines of code for `readability`. `SinkFunction` as its name, is the function invoked in the sink operator to provide a invoke function to handle record. `FlinkKafkaProducer` itself is a TwoPhaseCommitSinkFunction which implements `SinkFunction`. If we really want to avoid changing `SinkFunction`, I can have a new interface and have the current TwoPhaseCommitSinkFunction implements the new interface. It should be safer than the current way, and also avoids conflicts if that's the concern. Please let me know what do you think of this proposal. ### 2. `StreamElementSerializer`; I can not simply use `StreamElementSerializer` because the way watermark is stored/handled is different. In short, if multiple sink subtasks write to the same partition (sink), we need a way to decide the watermark in the source (downstream operator from the shuffle perspective). In the current netty shuffle service, we keep N channels and watermarks in each channel; while in this case, data and watermarks have been merged when writing to partitions. Please refer to Jira FLINK-15670 for discussion about watermark: https://issues.apache.org/jira/browse/FLINK-15670 You can start from [here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232) It includes my original thoughts, proposals and Stephan's enhanced version. ### 3. "Checkpoints" and "Savepoints" As far as I know, savepoints are very similar to checkpoints except that savepoints are more or less user-faced. That says user can trigger a replay based on save points. I guess I can kind of understanding why you are saying "restoring from an old savepoint would completely screw up the data". It is true if you think of this problem from a global snapshotting and global failover perspective. However, let's step back and think of why we we want to have the persistent shuffle in the first place. If data is persisted, you do not really need to replay the calculation again. Persistency is to unleash the constraints between upstream and downstream. For your concern, we do not need to do a global replay as well. We can simply do a regional replay. If there is any constraints in implementation, we can disable it for now. In the long term, I do not see it is a problem. But, Maybe I misunderstand you :-) Sorry to write this much again. These are all good questions that I think is more clear to answer in a systematic way :-). Let's chat in details this afternoon. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org