[ https://issues.apache.org/jira/browse/BEAM-7144?focusedWorklogId=260691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260691 ]
ASF GitHub Bot logged work on BEAM-7144: ---------------------------------------- Author: ASF GitHub Bot Created on: 14/Jun/19 20:22 Start Date: 14/Jun/19 20:22 Worklog Time Spent: 10m Work Description: mxm commented on pull request #8850: [BEAM-7144] Fix for rescaling problem on Flink >= 1.6 URL: https://github.com/apache/beam/pull/8850#discussion_r293962544 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ########## @@ -1233,4 +1234,121 @@ private void restoreWatermarkHoldsView() throws Exception { } } } + + /** Eagerly create user state to work around https://jira.apache.org/jira/browse/FLINK-12653. */ + public static class EarlyBinder implements StateBinder { Review comment: Feel free to ask anything :) `getPartitionedState` is a wrapper around `getOrCreateKeyedState`. The latter creates a keyed state (as the name suggests). To create a keyed state, you pass in a namespace serializer and a state descriptor. The state descriptor contains a state serializer and a state name. So the state is ready to be used now, but without an active namespace, it cannot be accessed. You can now cast the `State` you received to `InternalKvState` which allows you to set the namespace. The alternative to use `getPartitionedState`, which allows to specify an actual namespace. Perhaps this is even clearer when looking at the source code: ```java public <N, S extends State> S getPartitionedState( final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception { checkNotNull(namespace, "Namespace"); /* ... */ final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor); final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state; /* ... */ kvState.setCurrentNamespace(namespace); return state; } ``` I've omitted some caching code for simplicity. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 260691) Time Spent: 1h 50m (was: 1h 40m) > Job re-scale fails on Flink 1.7 > ------------------------------- > > Key: BEAM-7144 > URL: https://issues.apache.org/jira/browse/BEAM-7144 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 2.11.0 > Reporter: Jozef Vilcek > Assignee: Maximilian Michels > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > I am unable to rescale job after moving it to flink runner 1.7. What I am > doing is: > # Recompile job code just with swapped flink runner version 1.5 -> 1.7 > # Run streaming job with parallelism 112 and maxParallelism 448 > # Wait until checkpoint is taken > # Stop job > # Run job again with parallelims 224 and checpooint path to restore from > # Job fails > The same happens if I try to increase parallelims. This procedure works for > the same job compiled with flink runner 1.5 and run on 1.5.0. Fails with > runner 1.7 on flink 1.7.2 > Exception is: > {noformat} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > WindowDoFnOperator_2b6af61dc418f10e82551367a7e7f78e_(83/224) from any of the > 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) > ... 5 more > Caused by: java.lang.IndexOutOfBoundsException: Index: 101, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:73) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:492) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)