[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17475155#comment-17475155
 ] 

zlzhang0122 commented on FLINK-24207:
-

[~roman]  ok, I will check for this, thx!!

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474629#comment-17474629
 ] 

Roman Khachatryan commented on FLINK-24207:
---

Thanks for the clarification [~zlzhang0122] .

To me, the approach you described looks more like write-ahead-log rather than 
2PC. In TwoPhaseCommitSinkFunction terms, external system is a participant; 
while in WAL it's the sink responsibility. So maybe you should take a look at 
GenericWriteAheadSink instead?

Another option to consider is the [new sink 
API|https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API],
 which allows to distribute committables IIUC.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474472#comment-17474472
 ] 

zlzhang0122 commented on FLINK-24207:
-

[~roman]  Sorry for the late reply. In our use case, although the third party 
component can't support transaction or idempotent, we also want to ensure 
end-to-end exactly-once using 2PC, so we using ListState to cache the data. 
When checkpointing, we will snapshot the ListState, and when the 
CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to 
the third party downstream and delete it from the state. Of caurse, the commit 
or delete may failed, we use retry or fail the whole job to deal with this 
situation, for the commit of same batch data, the third party component can 
ensure the idempotent.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2021-09-27 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420597#comment-17420597
 ] 

Roman Khachatryan commented on FLINK-24207:
---

That's an interesting idea, however I have some concerns after an offline 
discussion with [~arvid].

[~zlzhang0122] could you please clarify:
 # For 2PC sinks, the state is usually only the metadata (such as transaction 
IDs), which shouldn't be large. Do you have a specific case where it is a 
problem?
 # For non-keyed streams, keyed backend is not initialized. Do you propose to 
initialize it anyways or maybe on demand? What will be the key and how would we 
obtain the key serializer?
 # Isn't it locigally more straightforward to implement operator backend 
capable of storing large state? Though it might be more difficult, it would 
benefit other cases

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.1
>
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2021-09-26 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420493#comment-17420493
 ] 

Arvid Heise commented on FLINK-24207:
-

I double checked and the new {{SinkOperator}} actually also uses the operator 
state store, so we could also rewrite the ticket to cover both interfaces.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.1
>
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2021-09-26 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420492#comment-17420492
 ] 

Arvid Heise commented on FLINK-24207:
-

I'm a bit torn on this one. It sounds like a reasonable idea. However, we are 
in the process of phasing out the old {{SinkFunction}} and I wouldn't want to 
invest too much into the respective implementations.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.1
>
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)