[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-18452: --- Labels: pull-request-available (was: ) > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call i
[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-18452: Fix Version/s: (was: 1.13.0) 1.12.0 > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Weike Dong >Priority: Major > Fix For: 1.12.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call it > _StreamExecSortComparator$579_).
[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-18452: Fix Version/s: (was: 1.12.0) 1.13.0 > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Weike Dong >Priority: Major > Fix For: 1.13.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call it > _StreamExecSortComparator$579_).
[jira] [Updated] (FLINK-18452) Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent state access after restoration
[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weike Dong updated FLINK-18452: --- Fix Version/s: 1.12.0 > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > --- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0, 1.10.1, 1.11.0 >Reporter: Weike Dong >Assignee: Weike Dong >Priority: Major > Fix For: 1.12.0 > > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call it > _StreamExecSortComparator$579_). > > At restoration, the objec