Hi Stephen,

Thanks for reporting the issue! David, good catch!

I think we have to resort to only using a single state cell for
buffering on checkpoints, instead of using a new one for every
checkpoint. I was under the assumption that, if the state cell was
cleared, it would not be checkpointed but that does not seem to be the case.

Thanks,
Max

On 21.04.20 09:29, David Morávek wrote:
> Hi Stephen,
> 
> nice catch and awesome report! ;) This definitely needs a proper fix.
> I've created a new JIRA to track the issue and will try to resolve it
> soon as this seems critical to me.
> 
> https://issues.apache.org/jira/browse/BEAM-9794
> 
> Thanks,
> D.
> 
> On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <[email protected]
> <mailto:[email protected]>> wrote:
> 
>     I was able to reproduce this in a unit test:
> 
>         @Test
> 
>           *public* *void* test() *throws* InterruptedException,
>         ExecutionException {
> 
>             FlinkPipelineOptions options =
>         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> 
>             options.setCheckpointingInterval(10L);
> 
>             options.setParallelism(1);
> 
>             options.setStreaming(*true*);
> 
>             options.setRunner(FlinkRunner.*class*);
> 
>             options.setFlinkMaster("[local]");
> 
>             options.setStateBackend(*new*
>         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> 
>             Pipeline pipeline = Pipeline./create/(options);
> 
>             pipeline
> 
>                 .apply(Create./of/((Void) *null*))
> 
>                 .apply(
> 
>                     ParDo./of/(
> 
>                         *new* DoFn<Void, Void>() {
> 
> 
>                           *private* *static* *final* *long*
>         */serialVersionUID/* = 1L;
> 
> 
>                           @RequiresStableInput
> 
>                           @ProcessElement
> 
>                           *public* *void* processElement() {}
> 
>                         }));
> 
>             pipeline.run();
> 
>           }
> 
> 
>     It took a while to get to checkpoint 32,767, but eventually it did,
>     and it failed with the same error I listed above.
> 
>     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     <[email protected] <mailto:[email protected]>> wrote:
> 
>         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>         emr-5.26.0) that uses the RequiresStableInput feature.
> 
>         Currently it's configured to checkpoint once a minute, and after
>         around 32000-33000 checkpoints, it fails with: 
> 
>             2020-04-15 13:15:02,920 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32701 @ 1586956502911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:15:05,762 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Completed checkpoint 32701 for job
>             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
>             2020-04-15 13:16:02,919 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32702 @ 1586956562911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:16:03,147 INFO
>              org.apache.flink.runtime.executiongraph.ExecutionGraph    
>                - <operator_name> (1/2)
>             (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
>             FAILED.
>             AsynchronousException{java.lang.Exception: Could not
>             materialize checkpoint 32702 for operator <operator_name>
>             (1/2).}
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>             at
>             
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at
>             
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
>             Caused by: java.lang.Exception: Could not materialize
>             checkpoint 32702 for operator <operator_name> (1/2).
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>             ... 6 more
>             Caused by: java.util.concurrent.ExecutionException:
>             java.lang.IllegalArgumentException
>             at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>             at
>             
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>             at
>             
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>             at
>             
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>             ... 5 more
>             Caused by: java.lang.IllegalArgumentException
>             at
>             
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>             at
>             
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>             at
>             
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>             at
>             
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>             at
>             
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>             ... 7 more
> 
> 
>         The exception comes from
>         here: 
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> 
>         In the Flink Runner code, I can see that each checkpoint will
>         result in a new OperatorState (or KeyedState if the stream is
>         keyed):
>         
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>         
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> 
>         This seems to be the reason the pipeline will eventually die.  
> 
>         While a workaround might be to increase the time between
>         checkpoints, it seems like any pipeline running on flink, using
>         the RequiresStableInput is limited in the amount of time that it
>         can run without being started from scratch.
> 

Reply via email to