I posted to the beam mailing list:
https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E

I think this is related to a Beam feature called RequiresStableInput (which
my pipeline is using).  It will create a new operator (or keyed) state per
checkpoint.  I'm not sure that there are any parameters that I have control
over to tweak it's behavior (apart from increasing the checkpoint interval
to let the pipeline run longer before building up that many states).

Perhaps this is something that can be fixed (maybe by unregistering
Operator States after they aren't used any more in the RequiresStableInput
code).  It seems to me that this isn't a Flink issue, but rather a Beam
issue.

Thanks for pointing me in the right direction.

On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <myas...@live.com> wrote:

> Hi Stephen
>
> I think the state name [1] which would be changed every time might the
> root cause. I am not familiar with Beam code, would it be possible to
> create so many operator states? Did you configure some parameters wrongly?
>
>
> [1]
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95
>
> Best
> Yun Tang
> ------------------------------
> *From:* Stephen Patel <merli...@gmail.com>
> *Sent:* Thursday, April 16, 2020 22:30
> *To:* Yun Tang <myas...@live.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Streaming Job eventually begins failing during
> checkpointing
>
> Correction.  I've actually found a place where it potentially might be
> creating a new operator state per checkpoint:
>
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149
>
> This gives me something I can investigate locally at least.
>
> On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <merli...@gmail.com> wrote:
>
> I can't say that I ever call that directly.  The beam library that I'm
> using does call it in a couple places:
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429
>
> But it seems to be the same descriptor every time.  Is that limit per
> operator?  That is, can each operator host up to 32767 operator/broadcast
> states?  I assume that's by name?
>
> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <myas...@live.com> wrote:
>
> Hi  Stephen
>
> This is not related with RocksDB but with default on-heap operator state
> backend. From your exception stack trace, you have created too many
> operator states (more than 32767).
> How do you call context.getOperatorStateStore().getListState or
> context.getOperatorStateStore().getBroadcastState ? Did you pass a
> different operator state descriptor each time?
>
> Best
> Yun Tang
> ------------------------------
> *From:* Stephen Patel <merli...@gmail.com>
> *Sent:* Thursday, April 16, 2020 2:09
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Streaming Job eventually begins failing during checkpointing
>
> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
> operates just fine for around 20 days, and then begins failing with this
> exception (it fails, restarts, and fails again, repeatedly):
>
> 2020-04-15 13:15:02,920 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:15:05,762 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
> in 2667 ms).
> 2020-04-15 13:16:02,919 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:16:03,147 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 32702 for operator <operator_name> (1/2).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for
> operator <operator_name> (1/2).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
> This application configured to retain external checkpoints.  When I
> attempt to restart from the last successful checkpoint, it will fail with
> the same error on the first checkpoint that happens after the restart.
>
> I haven't been able to find out why this might be. The source code doesn't
> seem particularly informative to my eyes:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>
> Has anyone else seen anything like this?
>
>

Reply via email to