Hi All,

I think the Beam Community fixed this issue:
https://github.com/apache/beam/pull/11478

Thanks!
Eleanore

On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <se...@apache.org> wrote:

> If something requires Beam to register a new state each time, then this is
> tricky, because currently you cannot unregister states from Flink.
>
> @Yu @Yun I remember chatting about this (allowing to explicitly unregister
> states so they get dropped from successive checkpoints) at some point, but
> I could not find a jira ticket for this. Do you remember what the status of
> that discussion is?
>
> On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <merli...@gmail.com> wrote:
>
>> I posted to the beam mailing list:
>> https://lists.apache.org/thread.html/rb2ebfad16d85bcf668978b3defd442feda0903c20db29c323497a672%40%3Cuser.beam.apache.org%3E
>>
>> I think this is related to a Beam feature called RequiresStableInput
>> (which my pipeline is using).  It will create a new operator (or keyed)
>> state per checkpoint.  I'm not sure that there are any parameters that I
>> have control over to tweak it's behavior (apart from increasing the
>> checkpoint interval to let the pipeline run longer before building up that
>> many states).
>>
>> Perhaps this is something that can be fixed (maybe by unregistering
>> Operator States after they aren't used any more in the RequiresStableInput
>> code).  It seems to me that this isn't a Flink issue, but rather a Beam
>> issue.
>>
>> Thanks for pointing me in the right direction.
>>
>> On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Stephen
>>>
>>> I think the state name [1] which would be changed every time might the
>>> root cause. I am not familiar with Beam code, would it be possible to
>>> create so many operator states? Did you configure some parameters wrongly?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L95
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Stephen Patel <merli...@gmail.com>
>>> *Sent:* Thursday, April 16, 2020 22:30
>>> *To:* Yun Tang <myas...@live.com>
>>> *Cc:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* Re: Streaming Job eventually begins failing during
>>> checkpointing
>>>
>>> Correction.  I've actually found a place where it potentially might be
>>> creating a new operator state per checkpoint:
>>>
>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105
>>> https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149
>>>
>>> This gives me something I can investigate locally at least.
>>>
>>> On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <merli...@gmail.com>
>>> wrote:
>>>
>>> I can't say that I ever call that directly.  The beam library that I'm
>>> using does call it in a couple places:
>>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429
>>>
>>> But it seems to be the same descriptor every time.  Is that limit per
>>> operator?  That is, can each operator host up to 32767 operator/broadcast
>>> states?  I assume that's by name?
>>>
>>> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <myas...@live.com> wrote:
>>>
>>> Hi  Stephen
>>>
>>> This is not related with RocksDB but with default on-heap operator state
>>> backend. From your exception stack trace, you have created too many
>>> operator states (more than 32767).
>>> How do you call context.getOperatorStateStore().getListState or
>>> context.getOperatorStateStore().getBroadcastState ? Did you pass a
>>> different operator state descriptor each time?
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Stephen Patel <merli...@gmail.com>
>>> *Sent:* Thursday, April 16, 2020 2:09
>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>> *Subject:* Streaming Job eventually begins failing during checkpointing
>>>
>>> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
>>> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
>>> operates just fine for around 20 days, and then begins failing with this
>>> exception (it fails, restarts, and fails again, repeatedly):
>>>
>>> 2020-04-15 13:15:02,920 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator <operator_name> (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator <operator_name> (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>>> ... 7 more
>>>
>>> This application configured to retain external checkpoints.  When I
>>> attempt to restart from the last successful checkpoint, it will fail with
>>> the same error on the first checkpoint that happens after the restart.
>>>
>>> I haven't been able to find out why this might be. The source code
>>> doesn't seem particularly informative to my eyes:
>>> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>>>
>>> Has anyone else seen anything like this?
>>>
>>>

Reply via email to