[ https://issues.apache.org/jira/browse/BEAM-7144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837382#comment-16837382 ]
Jozef Vilcek commented on BEAM-7144: ------------------------------------ Hm, I was afraid of this. Pipeline I was testing is like KafkaRead -> Filter -> WriteFiles -> Stateful map of written files (until accumulate required size) -> Map file name list per Key List to files is taken as WriteFilesResult.getPerDestinationOutputFilenames(). Pipeline is written in Scio. > Job re-scale fails on Flink 1.7 > ------------------------------- > > Key: BEAM-7144 > URL: https://issues.apache.org/jira/browse/BEAM-7144 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 2.11.0 > Reporter: Jozef Vilcek > Assignee: Maximilian Michels > Priority: Major > Fix For: 2.13.0 > > > I am unable to rescale job after moving it to flink runner 1.7. What I am > doing is: > # Recompile job code just with swapped flink runner version 1.5 -> 1.7 > # Run streaming job with parallelism 112 and maxParallelism 448 > # Wait until checkpoint is taken > # Stop job > # Run job again with parallelims 224 and checpooint path to restore from > # Job fails > The same happens if I try to increase parallelims. This procedure works for > the same job compiled with flink runner 1.5 and run on 1.5.0. Fails with > runner 1.7 on flink 1.7.2 > Exception is: > {noformat} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > WindowDoFnOperator_2b6af61dc418f10e82551367a7e7f78e_(83/224) from any of the > 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > ... 5 more > Caused by: java.lang.IndexOutOfBoundsException: Index: 101, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:73) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)