[ https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nico Kruber updated FLINK-11774: -------------------------------- Comment: was deleted (was: There are actually two bugs here: * enums not being viable sources for keys - [FLINK-16555|https://issues.apache.org/jira/browse/FLINK-16555] proposes to add warnings for this and other cases * something in the heap state back-end that causes wrong keygroup mappings with the exceptions I posted I would propose to create a separate (improvement) ticket for allowing enums (with the simple workaround of using their ordinal in the key extractor) and stick to the second issue here? As for that, I actually have it reproducible in a test that I quickly hacked together in [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java] (please checkout the whole repository since I had to change some dependencies). You can actually find all sorts of different exceptions associated with it, the two I mentioned but also this one: {code:java} 10:23:12,046 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Corrupt stream, found tag: 8 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133) at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more{code}) > IllegalArgumentException in HeapPriorityQueueSet > ------------------------------------------------ > > Key: FLINK-11774 > URL: https://issues.apache.org/jira/browse/FLINK-11774 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.7.2, 1.9.2, 1.10.0 > Environment: Can reproduce on the following configurations: > > OS: macOS 10.14.3 > Java: 1.8.0_202 > > OS: CentOS 7.2.1511 > Java: 1.8.0_102 > Reporter: Kirill Vainer > Priority: Blocker > Fix For: 1.9.3, 1.10.1, 1.11.0 > > Attachments: flink-bug-dist.zip, flink-bug-src.zip > > > Hi, > I encountered the following exception: > {code} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) > at flink.bug.App.main(App.java:21) > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) > at > org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876) > at > org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36) > at > org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > {code} > Code that reproduces the problem: > {code:java} > package flink.bug; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import org.apache.flink.streaming.api.windowing.time.Time; > public class App { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > env.fromElements(1, 2) > .map(Aggregate::new) > .keyBy(Aggregate::getKey) > .timeWindow(Time.seconds(2)) > .reduce(Aggregate::reduce) > .addSink(new CollectSink()); > env.execute(); > } > private static class Aggregate { > private Key key = new Key(); > public Aggregate(long number) { > } > public static Aggregate reduce(Aggregate a, Aggregate b) { > return new Aggregate(0); > } > public Key getKey() { > return key; > } > } > public static class Key { > } > private static class CollectSink implements SinkFunction<Aggregate> { > private static final long serialVersionUID = 1; > @SuppressWarnings("rawtypes") > @Override > public void invoke(Aggregate value, Context ctx) throws Exception { > } > } > } > {code} > Attached is the project that can be executed with {{./gradlew run}} showing > the problem, or you can run the attached {{flink-bug-dist.zip}} which is > prepackaged with the dependencies. > Thanks in advance -- This message was sent by Atlassian Jira (v8.3.4#803005)