[ 
https://issues.apache.org/jira/browse/FLINK-11774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-11774:
--------------------------------
    Comment: was deleted

(was: I actually occasionally also see this with the {{TopSpeedWindowing}} 
example in Flink. I managed to get hold of a savepoint that was taken with 
parallelism 2 (p=2) and shows a similar failure in two different ways (all 
running in Flink 1.10.0; but I also see it in Flink 1.9):
* first of all, if I try to restore with p=2, everything is fine
* if I restore with p=4 I get an exception like the one mentioned above:
{code}
2020-03-11 15:53:35,149 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) 
(2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, 
endKeyGroup=95} does not contain key group 97
        at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
        at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
        at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 15 more
{code}
* if I restore with p=3, I get the following exception instead:
{code}
2020-03-11 21:40:28,390 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
PassThroughWindowFunction) -> Sink: Print to Std. Out (3/3) 
(2fb8acde321f6cbfec56c300153d6dea) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/3) from 
any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        ... 11 more
Caused by: java.lang.NullPointerException
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        ... 15 more
{code}

While the latter error somewhat indicates that the savepoint may be corrupt (or 
that the reader/deserializer messed up), it remains a mystery to me why I can 
successfully restore with p=2.)

> IllegalArgumentException in HeapPriorityQueueSet
> ------------------------------------------------
>
>                 Key: FLINK-11774
>                 URL: https://issues.apache.org/jira/browse/FLINK-11774
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.7.2, 1.9.2, 1.10.0
>         Environment: Can reproduce on the following configurations:
>  
> OS: macOS 10.14.3
> Java: 1.8.0_202
>  
> OS: CentOS 7.2.1511
> Java: 1.8.0_102
>            Reporter: Kirill Vainer
>            Priority: Blocker
>             Fix For: 1.9.3, 1.10.1, 1.11.0
>
>         Attachments: flink-bug-dist.zip, flink-bug-src.zip
>
>
> Hi,
> I encountered the following exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>         at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>         at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>         at flink.bug.App.main(App.java:21)
> Caused by: java.lang.IllegalArgumentException
>         at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>         at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>         at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>         at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>         at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>         at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>         at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
>         at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
>         at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
>         at 
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
>         at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
>         at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
>         at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> Code that reproduces the problem:
> {code:java}
> package flink.bug;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.windowing.time.Time;
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setParallelism(2);
>         env.fromElements(1, 2)
>             .map(Aggregate::new)
>             .keyBy(Aggregate::getKey)
>             .timeWindow(Time.seconds(2))
>             .reduce(Aggregate::reduce)
>             .addSink(new CollectSink());
>         env.execute();
>     }
>     private static class Aggregate {
>         private Key key = new Key();
>         public Aggregate(long number) {
>         }
>         public static Aggregate reduce(Aggregate a, Aggregate b) {
>             return new Aggregate(0);
>         }
>         public Key getKey() {
>             return key;
>         }
>     }
>     public static class Key {
>     }
>     private static class CollectSink implements SinkFunction<Aggregate> {
>         private static final long serialVersionUID = 1;
>         @SuppressWarnings("rawtypes")
>         @Override
>         public void invoke(Aggregate value, Context ctx) throws Exception {
>         }
>     }
> }
> {code}
> Attached is the project that can be executed with {{./gradlew run}} showing 
> the problem, or you can run the attached {{flink-bug-dist.zip}} which is 
> prepackaged with the dependencies.
> Thanks in advance



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to