[ 
https://issues.apache.org/jira/browse/FLINK-17918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121031#comment-17121031
 ] 

Piotr Nowojski edited comment on FLINK-17918 at 6/1/20, 1:56 PM:
-----------------------------------------------------------------

I think [~AHeise] is right. We are mutating the list that's used in the state 
field {{AppendOnlyTopNFunction#dataState}} [here in the line 
113|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java#L113]

Someone could correct me if I'm wrong, I do not know the guarantees of our 
state backends very well, but via looking at the relevant code there is an 
assumption, that sync part of checkpoint should do all of the defensive 
copies/protections from making sure of "freezeing" state snapshot for later IO 
operations. This seems to be confirmed by the java doc above 
{{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#stateSnapshot}} (it's 
being invoked in sync checkpoint part for the 
{{AppendOnlyTopNFunction#dataState}} field):
{code:java}
        /**
         * Creates a snapshot of this {@link CopyOnWriteStateMap}, to be 
written in checkpointing. The snapshot integrity
         * is protected through copy-on-write from the {@link 
CopyOnWriteStateMap}. Users should call
         * {@link #releaseSnapshot(StateMapSnapshot)} after using the returned 
object.
         *
         * @return a snapshot from this {@link CopyOnWriteStateMap}, for 
checkpointing.
         */
{code}
but in the 
{{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#snapshotMapArrays}} 
we are only marking whole {{StateMapEntry}} (the type of 
{{AppendOnlyTopNFunction#dataState}} is  {{MapState<RowData, List<RowData>>}}. 
Nothing more. So the defensive copy-on-write works only as long, as someone 
else is not updating the referenced structures (like the {{List<RowData>}} in 
the background.


was (Author: pnowojski):
I think [~AHeise] is right. We are mutating the list that's used in the state 
field {{AppendOnlyTopNFunction#dataState}} 
[here|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/rank/AppendOnlyTopNFunction.java#L113]

Someone could correct me if I'm wrong, I do not know the guarantees of our 
state backends very well, but via looking at the relevant code there is an 
assumption, that sync part of checkpoint should do all of the defensive 
copies/protections from making sure of "freezeing" state snapshot for later IO 
operations. This seems to be confirmed by the java doc above 
{{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#stateSnapshot}} (it's 
being invoked in sync checkpoint part for the 
{{AppendOnlyTopNFunction#dataState}} field):
{code:java}
        /**
         * Creates a snapshot of this {@link CopyOnWriteStateMap}, to be 
written in checkpointing. The snapshot integrity
         * is protected through copy-on-write from the {@link 
CopyOnWriteStateMap}. Users should call
         * {@link #releaseSnapshot(StateMapSnapshot)} after using the returned 
object.
         *
         * @return a snapshot from this {@link CopyOnWriteStateMap}, for 
checkpointing.
         */
{code}
but in the 
{{org.apache.flink.runtime.state.heap.CopyOnWriteStateMap#snapshotMapArrays}} 
we are only marking whole {{StateMapEntry}} (the type of 
{{AppendOnlyTopNFunction#dataState}} is  {{MapState<RowData, List<RowData>>}}. 
Nothing more. So the defensive copy-on-write works only as long, as someone 
else is not updating the referenced structures (like the {{List<RowData>}} in 
the background.

> Blink Jobs are loosing data on recovery
> ---------------------------------------
>
>                 Key: FLINK-17918
>                 URL: https://issues.apache.org/jira/browse/FLINK-17918
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Assignee: Arvid Heise
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> After trying to enable unaligned checkpoints by default, a lot of Blink 
> streaming SQL/Table API tests containing joins or set operations are throwing 
> errors that are indicating we are loosing some data (full records, without 
> deserialisation errors). Example errors:
> {noformat}
> [ERROR] Failures: 
> [ERROR]   JoinITCase.testFullJoinWithEqualPk:775 expected:<List(1,1, 2,2, 
> 3,3, null,4, null,5)> but was:<List(2,2, 3,3, null,1, null,4, null,5)>
> [ERROR]   JoinITCase.testStreamJoinWithSameRecord:391 expected:<List(1,1,1,1, 
> 1,1,1,1, 2,2,2,2, 2,2,2,2, 3,3,3,3, 3,3,3,3, 4,4,4,4, 4,4,4,4, 5,5,5,5, 
> 5,5,5,5)> but was:<List()>
> [ERROR]   SemiAntiJoinStreamITCase.testAntiJoin:352 expected:<0> but was:<1>
> [ERROR]   SetOperatorsITCase.testIntersect:55 expected:<MutableList(1,1,Hi, 
> 2,2,Hello, 3,2,Hello world)> but was:<List()>
> [ERROR]   JoinITCase.testJoinPushThroughJoin:1272 expected:<List(1,0,Hi, 
> 2,1,Hello, 2,1,Hello world)> but was:<List(2,1,Hello, 2,1,Hello world)>
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to