[ https://issues.apache.org/jira/browse/FLINK-18452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17148289#comment-17148289 ]
Weike Dong commented on FLINK-18452: ------------------------------------ Hi [~jinyu.zj], would you please be so kind to look at this issus, as you are the original author for the code. Thank you very much. > Flaws in RetractableTopNFunction.ComparatorWrapper#equals method prevent > state access after restoration > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-18452 > URL: https://issues.apache.org/jira/browse/FLINK-18452 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.10.0, 1.10.1, 1.11.0 > Reporter: Weike Dong > Priority: Major > Attachments: > c2ebeac8aadebad0dffa5cc255d45190594c5b2a84bda020dd30bf24b9169702.png > > > We found that in SQL jobs using "Top-N" functionality provided by the blink > planner, the job state cannot be retrieved because of "incompatible" state > serializers (in fact they are compatible). > The error log is displayed like below > {panel:title=taskmanager.log} > 2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], > partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, > serverid, quantity]) (1/1)] INFO org.apache.flink.runtime.taskmanager.Task > - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], > orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) > (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) > at > org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer cannot be incompatible. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 13 more{panel} > > After careful debugging, it is found to be an issue with the compatibility > check of type serializers. > > In short, during checkpointing, Flink serializes _SortedMapSerializer_ by > creating a _SortedMapSerializerSnapshot_ object, and the original comparator > is encapsulated within the object (here we call it > _StreamExecSortComparator$579_). > > At restoration, the object is read and restored as normal. However, during > the construction of RetractableTopNFunction instance, another Comparator is > provided by Flink as an argument (we call it _StreamExecSortComparator$626_), > and it is later used in the _ValueStateDescriptor_ which acts like a key to > the state store. > > Here comes the problem: when the newly-restored Flink program tries to access > state (_getState_) through the previously mentioned _ValueStateDescriptor_, > the State Backend firstly detects whether the provided comparator in state > descriptor is compatible with the one in snapshot, eventually the logic goes > to the _equals_ method at _RetractableTopNFunction.ComparatorWrapper_ class. > > In the equals method, here is a code snippet: > {code:java} > return > generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName()) > && > > generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) && > Arrays.equals(generatedRecordComparator.getReferences(), > oGeneratedComparator.getReferences()); > {code} > After debugging, we found that the class name of comparator within snapshot > is _StreamExecSortComparator$579_, and the class name of comparator provided > in the new job is _StreamExecSortComparator$626_, hence this method always > returns false, even though actually they are indeed compatible (acts the > same). Also, because the code in each generator is generated independently, > the corresponding varaibles within the two comparators are highly likely to > be different (_isNullA$581_ vs _isNullA$682_). > > Hence we believe that the implementation of equals method has serious flaws, > and should be addressed in later releases. -- This message was sent by Atlassian Jira (v8.3.4#803005)