[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475155#comment-17475155 ] zlzhang0122 commented on FLINK-24207: - [~roman] ok, I will check for this, thx!! > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474629#comment-17474629 ] Roman Khachatryan commented on FLINK-24207: --- Thanks for the clarification [~zlzhang0122] . To me, the approach you described looks more like write-ahead-log rather than 2PC. In TwoPhaseCommitSinkFunction terms, external system is a participant; while in WAL it's the sink responsibility. So maybe you should take a look at GenericWriteAheadSink instead? Another option to consider is the [new sink API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API], which allows to distribute committables IIUC. > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474472#comment-17474472 ] zlzhang0122 commented on FLINK-24207: - [~roman] Sorry for the late reply. In our use case, although the third party component can't support transaction or idempotent, we also want to ensure end-to-end exactly-once using 2PC, so we using ListState to cache the data. When checkpointing, we will snapshot the ListState, and when the CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to the third party downstream and delete it from the state. Of caurse, the commit or delete may failed, we use retry or fail the whole job to deal with this situation, for the commit of same batch data, the third party component can ensure the idempotent. > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420597#comment-17420597 ] Roman Khachatryan commented on FLINK-24207: --- That's an interesting idea, however I have some concerns after an offline discussion with [~arvid]. [~zlzhang0122] could you please clarify: # For 2PC sinks, the state is usually only the metadata (such as transaction IDs), which shouldn't be large. Do you have a specific case where it is a problem? # For non-keyed streams, keyed backend is not initialized. Do you propose to initialize it anyways or maybe on demand? What will be the key and how would we obtain the key serializer? # Isn't it locigally more straightforward to implement operator backend capable of storing large state? Though it might be more difficult, it would benefit other cases > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.14.1 > > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420493#comment-17420493 ] Arvid Heise commented on FLINK-24207: - I double checked and the new {{SinkOperator}} actually also uses the operator state store, so we could also rewrite the ticket to cover both interfaces. > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.14.1 > > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction
[ https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420492#comment-17420492 ] Arvid Heise commented on FLINK-24207: - I'm a bit torn on this one. It sounds like a reasonable idea. However, we are in the process of phasing out the old {{SinkFunction}} and I wouldn't want to invest too much into the respective implementations. > Add support of KeyedState in TwoPhaseCommitSinkFunction > --- > > Key: FLINK-24207 > URL: https://issues.apache.org/jira/browse/FLINK-24207 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 1.12.2, 1.13.1 >Reporter: zlzhang0122 >Priority: Major > Fix For: 1.14.1 > > > Now, the implementation of TwoPhaseCommitSinkFunction is based on operator > state, but operator state will do a deep copy when taking checkpoint, so > large operator state may produce a OOM error. Add support of KeyedState in > TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and > give users more convenience. -- This message was sent by Atlassian Jira (v8.3.4#803005)