Hi Max, Thanks for the information and I saw this PR is already merged, just wonder is it backported to the affected versions already (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have to wait for the 2.20.1 release?
Thanks a lot! Eleanore On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <m...@apache.org> wrote: > Hi Eleanore, > > Exactly-once is not affected but the pipeline can fail to checkpoint > after the maximum number of state cells have been reached. We are > working on a fix [1]. > > Cheers, > Max > > [1] https://github.com/apache/beam/pull/11478 > > On 22.04.20 07:19, Eleanore Jin wrote: > > Hi Maxi, > > > > I assume this will impact the Exactly Once Semantics that beam provided > > as in the KafkaExactlyOnceSink, the processElement method is also > > annotated with @RequiresStableInput? > > > > Thanks a lot! > > Eleanore > > > > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > Hi Stephen, > > > > Thanks for reporting the issue! David, good catch! > > > > I think we have to resort to only using a single state cell for > > buffering on checkpoints, instead of using a new one for every > > checkpoint. I was under the assumption that, if the state cell was > > cleared, it would not be checkpointed but that does not seem to be > > the case. > > > > Thanks, > > Max > > > > On 21.04.20 09:29, David Morávek wrote: > > > Hi Stephen, > > > > > > nice catch and awesome report! ;) This definitely needs a proper > fix. > > > I've created a new JIRA to track the issue and will try to resolve > it > > > soon as this seems critical to me. > > > > > > https://issues.apache.org/jira/browse/BEAM-9794 > > > > > > Thanks, > > > D. > > > > > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel > > <stephenpate...@gmail.com <mailto:stephenpate...@gmail.com> > > > <mailto:stephenpate...@gmail.com > > <mailto:stephenpate...@gmail.com>>> wrote: > > > > > > I was able to reproduce this in a unit test: > > > > > > @Test > > > > > > *public* *void* test() *throws* InterruptedException, > > > ExecutionException { > > > > > > FlinkPipelineOptions options = > > > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*); > > > > > > options.setCheckpointingInterval(10L); > > > > > > options.setParallelism(1); > > > > > > options.setStreaming(*true*); > > > > > > options.setRunner(FlinkRunner.*class*); > > > > > > options.setFlinkMaster("[local]"); > > > > > > options.setStateBackend(*new* > > > MemoryStateBackend(Integer.*/MAX_VALUE/*)); > > > > > > Pipeline pipeline = Pipeline./create/(options); > > > > > > pipeline > > > > > > .apply(Create./of/((Void) *null*)) > > > > > > .apply( > > > > > > ParDo./of/( > > > > > > *new* DoFn<Void, Void>() { > > > > > > > > > *private* *static* *final* *long* > > > */serialVersionUID/* = 1L; > > > > > > > > > @RequiresStableInput > > > > > > @ProcessElement > > > > > > *public* *void* processElement() {} > > > > > > })); > > > > > > pipeline.run(); > > > > > > } > > > > > > > > > It took a while to get to checkpoint 32,767, but eventually it > > did, > > > and it failed with the same error I listed above. > > > > > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel > > > <stephenpate...@gmail.com <mailto:stephenpate...@gmail.com> > > <mailto:stephenpate...@gmail.com <mailto:stephenpate...@gmail.com>>> > > wrote: > > > > > > I have a Beam Pipeline (2.14) running on Flink (1.8.0, > > > emr-5.26.0) that uses the RequiresStableInput feature. > > > > > > Currently it's configured to checkpoint once a minute, and > > after > > > around 32000-33000 checkpoints, it fails with: > > > > > > 2020-04-15 13:15:02,920 INFO > > > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > > - Triggering checkpoint 32701 @ 1586956502911 for job > > > 9953424f21e240112dd23ab4f8320b60. > > > 2020-04-15 13:15:05,762 INFO > > > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > > - Completed checkpoint 32701 for job > > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in > > 2667 ms). > > > 2020-04-15 13:16:02,919 INFO > > > > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > > > - Triggering checkpoint 32702 @ 1586956562911 for job > > > 9953424f21e240112dd23ab4f8320b60. > > > 2020-04-15 13:16:03,147 INFO > > > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - <operator_name> (1/2) > > > (f4737add01961f8b42b8eb4e791b83ba) switched from > > RUNNING to > > > FAILED. > > > AsynchronousException{java.lang.Exception: Could not > > > materialize checkpoint 32702 for operator > <operator_name> > > > (1/2).} > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > > > at > > > > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > > at > > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > > at > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: java.lang.Exception: Could not materialize > > > checkpoint 32702 for operator <operator_name> (1/2). > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > > > ... 6 more > > > Caused by: java.util.concurrent.ExecutionException: > > > java.lang.IllegalArgumentException > > > at > > java.util.concurrent.FutureTask.report(FutureTask.java:122) > > > at > > java.util.concurrent.FutureTask.get(FutureTask.java:192) > > > at > > > > > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) > > > at > > > > > > > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > > > at > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > > > ... 5 more > > > Caused by: java.lang.IllegalArgumentException > > > at > > > > > > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > > > at > > > > > > > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) > > > at > > > > > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) > > > at > > > > > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > > > at > > > > > > > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > > > at > > java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > at > > > > > > > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) > > > ... 7 more > > > > > > > > > The exception comes from > > > > > here: > https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 > > > > > > In the Flink Runner code, I can see that each checkpoint > will > > > result in a new OperatorState (or KeyedState if the stream > is > > > keyed): > > > > > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103 > > > > > > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143 > > > > > > This seems to be the reason the pipeline will eventually > > die. > > > > > > While a workaround might be to increase the time between > > > checkpoints, it seems like any pipeline running on flink, > > using > > > the RequiresStableInput is limited in the amount of time > > that it > > > can run without being started from scratch. > > > > > >