Sorry, just noticed this thread... @Stephan I cannot remember the discussion but I think it's an interesting topic, will find some time to consider it (unregister states).
@Eleanore Glad to know that Beam community has fixed it and thanks for the reference. Best Regards, Yu On Sun, 26 Apr 2020 at 03:10, Eleanore Jin <eleanore....@gmail.com> wrote: > Hi All, > > I think the Beam Community fixed this issue: > https://github.com/apache/beam/pull/11478 > > Thanks! > Eleanore > > On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <se...@apache.org> wrote: > >> If something requires Beam to register a new state each time, then this >> is tricky, because currently you cannot unregister states from Flink. >> >> @Yu @Yun I remember chatting about this (allowing to explicitly >> unregister states so they get dropped from successive checkpoints) at some >> point, but I could not find a jira ticket for this. Do you remember what >> the status of that discussion is? >> >> On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <merli...@gmail.com> wrote: >> >>> I posted to the beam mailing list: >>> https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E >>> >>> I think this is related to a Beam feature called RequiresStableInput >>> (which my pipeline is using). It will create a new operator (or keyed) >>> state per checkpoint. I'm not sure that there are any parameters that I >>> have control over to tweak it's behavior (apart from increasing the >>> checkpoint interval to let the pipeline run longer before building up that >>> many states). >>> >>> Perhaps this is something that can be fixed (maybe by unregistering >>> Operator States after they aren't used any more in the RequiresStableInput >>> code). It seems to me that this isn't a Flink issue, but rather a Beam >>> issue. >>> >>> Thanks for pointing me in the right direction. >>> >>> On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <myas...@live.com> wrote: >>> >>>> Hi Stephen >>>> >>>> I think the state name [1] which would be changed every time might the >>>> root cause. I am not familiar with Beam code, would it be possible to >>>> create so many operator states? Did you configure some parameters wrongly? >>>> >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95 >>>> >>>> Best >>>> Yun Tang >>>> ------------------------------ >>>> *From:* Stephen Patel <merli...@gmail.com> >>>> *Sent:* Thursday, April 16, 2020 22:30 >>>> *To:* Yun Tang <myas...@live.com> >>>> *Cc:* user@flink.apache.org <user@flink.apache.org> >>>> *Subject:* Re: Streaming Job eventually begins failing during >>>> checkpointing >>>> >>>> Correction. I've actually found a place where it potentially might be >>>> creating a new operator state per checkpoint: >>>> >>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105 >>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149 >>>> >>>> This gives me something I can investigate locally at least. >>>> >>>> On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <merli...@gmail.com> >>>> wrote: >>>> >>>> I can't say that I ever call that directly. The beam library that I'm >>>> using does call it in a couple places: >>>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429 >>>> >>>> But it seems to be the same descriptor every time. Is that limit per >>>> operator? That is, can each operator host up to 32767 operator/broadcast >>>> states? I assume that's by name? >>>> >>>> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <myas...@live.com> wrote: >>>> >>>> Hi Stephen >>>> >>>> This is not related with RocksDB but with default on-heap operator >>>> state backend. From your exception stack trace, you have created too many >>>> operator states (more than 32767). >>>> How do you call context.getOperatorStateStore().getListState or >>>> context.getOperatorStateStore().getBroadcastState ? Did you pass a >>>> different operator state descriptor each time? >>>> >>>> Best >>>> Yun Tang >>>> ------------------------------ >>>> *From:* Stephen Patel <merli...@gmail.com> >>>> *Sent:* Thursday, April 16, 2020 2:09 >>>> *To:* user@flink.apache.org <user@flink.apache.org> >>>> *Subject:* Streaming Job eventually begins failing during checkpointing >>>> >>>> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn. It's >>>> configured to use rocksdb, and checkpoint once a minute to hdfs. This job >>>> operates just fine for around 20 days, and then begins failing with this >>>> exception (it fails, restarts, and fails again, repeatedly): >>>> >>>> 2020-04-15 13:15:02,920 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. >>>> 2020-04-15 13:15:05,762 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes >>>> in 2667 ms). >>>> 2020-04-15 13:16:02,919 INFO >>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. >>>> 2020-04-15 13:16:03,147 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from >>>> RUNNING to FAILED. >>>> AsynchronousException{java.lang.Exception: Could not materialize >>>> checkpoint 32702 for operator <operator_name> (1/2).} >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702 >>>> for operator <operator_name> (1/2). >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) >>>> ... 6 more >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.IllegalArgumentException >>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) >>>> at >>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >>>> ... 5 more >>>> Caused by: java.lang.IllegalArgumentException >>>> at >>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) >>>> at >>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) >>>> at >>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) >>>> ... 7 more >>>> >>>> This application configured to retain external checkpoints. When I >>>> attempt to restart from the last successful checkpoint, it will fail with >>>> the same error on the first checkpoint that happens after the restart. >>>> >>>> I haven't been able to find out why this might be. The source code >>>> doesn't seem particularly informative to my eyes: >>>> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 >>>> >>>> Has anyone else seen anything like this? >>>> >>>>