[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-18452:
---
Labels: pull-request-available  (was: )

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call i

[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-11-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-18452:

Fix Version/s: (was: 1.13.0)
   1.12.0

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).

[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-11-16 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-18452:

Fix Version/s: (was: 1.12.0)
   1.13.0

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).

[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration

2020-07-13 Thread Weike Dong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weike Dong updated FLINK-18452:
---
Fix Version/s: 1.12.0

> Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent 
> state access after restoration
> ---
>
> Key: FLINK-18452
> URL: https://issues.apache.org/jira/browse/FLINK-18452
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0, 1.10.1, 1.11.0
>Reporter: Weike Dong
>Assignee: Weike Dong
>Priority: Major
> Fix For: 1.12.0
>
> Attachments: 
> c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png
>
>
> We found that in SQL jobs using "Top-N" functionality provided by the blink 
> planner, the job state cannot be retrieved because of "incompatible" state 
> serializers (in fact they are compatible).
> The error log is displayed like below
> {panel:title=taskmanager.log}
> 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], 
> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], 
> partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, 
> serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  
> - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], 
> orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) 
> (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Error while getting state
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> at 
> org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state 
> serializer cannot be incompatible.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
> at 
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
> at 
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at 
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
> ... 13 more{panel}
>  
> After careful debugging, it is found to be an issue with the compatibility 
> check of type serializers.
>  
> In short, during checkpointing, Flink serializes _SortedMapSerializer_ by 
> creating a _SortedMapSerializerSnapshot_ object, and the original comparator 
> is encapsulated within the object (here we call it 
> _StreamExecSortComparator$579_).
>  
> At restoration, the objec