Sorry, just noticed this thread...

@Stephan I cannot remember the discussion but I think it's an interesting
topic, will find some time to consider it (unregister states).

@Eleanore Glad to know that Beam community has fixed it and thanks for the

Best Regards,

On Sun, 26 Apr 2020 at 03:10, Eleanore Jin <> wrote:

> Hi All,
> I think the Beam Community fixed this issue:
> Thanks!
> Eleanore
> On Thu, Apr 23, 2020 at 4:24 AM Stephan Ewen <> wrote:
>> If something requires Beam to register a new state each time, then this
>> is tricky, because currently you cannot unregister states from Flink.
>> @Yu @Yun I remember chatting about this (allowing to explicitly
>> unregister states so they get dropped from successive checkpoints) at some
>> point, but I could not find a jira ticket for this. Do you remember what
>> the status of that discussion is?
>> On Thu, Apr 16, 2020 at 6:37 PM Stephen Patel <> wrote:
>>> I posted to the beam mailing list:
>>> I think this is related to a Beam feature called RequiresStableInput
>>> (which my pipeline is using).  It will create a new operator (or keyed)
>>> state per checkpoint.  I'm not sure that there are any parameters that I
>>> have control over to tweak it's behavior (apart from increasing the
>>> checkpoint interval to let the pipeline run longer before building up that
>>> many states).
>>> Perhaps this is something that can be fixed (maybe by unregistering
>>> Operator States after they aren't used any more in the RequiresStableInput
>>> code).  It seems to me that this isn't a Flink issue, but rather a Beam
>>> issue.
>>> Thanks for pointing me in the right direction.
>>> On Thu, Apr 16, 2020 at 11:29 AM Yun Tang <> wrote:
>>>> Hi Stephen
>>>> I think the state name [1] which would be changed every time might the
>>>> root cause. I am not familiar with Beam code, would it be possible to
>>>> create so many operator states? Did you configure some parameters wrongly?
>>>> [1]
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Stephen Patel <>
>>>> *Sent:* Thursday, April 16, 2020 22:30
>>>> *To:* Yun Tang <>
>>>> *Cc:* <>
>>>> *Subject:* Re: Streaming Job eventually begins failing during
>>>> checkpointing
>>>> Correction.  I've actually found a place where it potentially might be
>>>> creating a new operator state per checkpoint:
>>>> This gives me something I can investigate locally at least.
>>>> On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <>
>>>> wrote:
>>>> I can't say that I ever call that directly.  The beam library that I'm
>>>> using does call it in a couple places:
>>>> But it seems to be the same descriptor every time.  Is that limit per
>>>> operator?  That is, can each operator host up to 32767 operator/broadcast
>>>> states?  I assume that's by name?
>>>> On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <> wrote:
>>>> Hi  Stephen
>>>> This is not related with RocksDB but with default on-heap operator
>>>> state backend. From your exception stack trace, you have created too many
>>>> operator states (more than 32767).
>>>> How do you call context.getOperatorStateStore().getListState or
>>>> context.getOperatorStateStore().getBroadcastState ? Did you pass a
>>>> different operator state descriptor each time?
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Stephen Patel <>
>>>> *Sent:* Thursday, April 16, 2020 2:09
>>>> *To:* <>
>>>> *Subject:* Streaming Job eventually begins failing during checkpointing
>>>> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn.  It's
>>>> configured to use rocksdb, and checkpoint once a minute to hdfs.  This job
>>>> operates just fine for around 20 days, and then begins failing with this
>>>> exception (it fails, restarts, and fails again, repeatedly):
>>>> 2020-04-15 13:15:02,920 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>>> 2020-04-15 13:15:05,762 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>>> in 2667 ms).
>>>> 2020-04-15 13:16:02,919 INFO
>>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>>> 2020-04-15 13:16:03,147 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>>> AsynchronousException{java.lang.Exception: Could not materialize
>>>> checkpoint 32702 for operator <operator_name> (1/2).}
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$
>>>> at
>>>> java.util.concurrent.Executors$
>>>> at
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$
>>>> at
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>>> for operator <operator_name> (1/2).
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(
>>>> ... 6 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.IllegalArgumentException
>>>> at
>>>> at java.util.concurrent.FutureTask.get(
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
>>>> at
>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$
>>>> ... 5 more
>>>> Caused by: java.lang.IllegalArgumentException
>>>> at
>>>> org.apache.flink.util.Preconditions.checkArgument(
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(
>>>> at
>>>> at
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(
>>>> ... 7 more
>>>> This application configured to retain external checkpoints.  When I
>>>> attempt to restart from the last successful checkpoint, it will fail with
>>>> the same error on the first checkpoint that happens after the restart.
>>>> I haven't been able to find out why this might be. The source code
>>>> doesn't seem particularly informative to my eyes:
>>>> Has anyone else seen anything like this?

Reply via email to