Hi Stephen

I think the state name [1] which would be changed every time might the root 
cause. I am not familiar with Beam code, would it be possible to create so many 
operator states? Did you configure some parameters wrongly?


[1] 
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95

Best
Yun Tang
________________________________
From: Stephen Patel <merli...@gmail.com>
Sent: Thursday, April 16, 2020 22:30
To: Yun Tang <myas...@live.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Streaming Job eventually begins failing during checkpointing

Correction.  I've actually found a place where it potentially might be creating 
a new operator state per checkpoint:
https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149

This gives me something I can investigate locally at least.

On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel 
<merli...@gmail.com<mailto:merli...@gmail.com>> wrote:
I can't say that I ever call that directly.  The beam library that I'm using 
does call it in a couple places: 
https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429

But it seems to be the same descriptor every time.  Is that limit per operator? 
 That is, can each operator host up to 32767 operator/broadcast states?  I 
assume that's by name?

On Wed, Apr 15, 2020 at 10:46 PM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi  Stephen

This is not related with RocksDB but with default on-heap operator state 
backend. From your exception stack trace, you have created too many operator 
states (more than 32767).
How do you call context.getOperatorStateStore().getListState or 
context.getOperatorStateStore().getBroadcastState ? Did you pass a different 
operator state descriptor each time?

Best
Yun Tang
________________________________
From: Stephen Patel <merli...@gmail.com<mailto:merli...@gmail.com>>
Sent: Thursday, April 16, 2020 2:09
To: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Streaming Job eventually begins failing during checkpointing

I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's 
configured to use rocksdb, and checkpoint once a minute to hdfs.  This job 
operates just fine for around 20 days, and then begins failing with this 
exception (it fails, restarts, and fails again, repeatedly):

2020-04-15 13:15:02,920 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:15:05,762 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed 
checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 
2667 ms).
2020-04-15 13:16:02,919 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering 
checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
2020-04-15 13:16:03,147 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - <operator_name> 
(1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
32702 for operator <operator_name> (1/2).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for 
operator <operator_name> (1/2).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalArgumentException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
... 7 more

This application configured to retain external checkpoints.  When I attempt to 
restart from the last successful checkpoint, it will fail with the same error 
on the first checkpoint that happens after the restart.

I haven't been able to find out why this might be. The source code doesn't seem 
particularly informative to my eyes: 
https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68

Has anyone else seen anything like this?

Reply via email to