Hi Aljoscha, I printed the stack trace in the createAccumulator() method of my combinar:
taskmanager_1 | java.lang.Thread.getStackTrace(Thread.java:1559) taskmanager_1 | com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29) taskmanager_1 | com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510) taskmanager_1 | org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127) taskmanager_1 | org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059) taskmanager_1 | org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768) taskmanager_1 | org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137) taskmanager_1 | org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) taskmanager_1 | org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) taskmanager_1 | org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) taskmanager_1 | org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) taskmanager_1 | org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913) taskmanager_1 | org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) taskmanager_1 | org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702) taskmanager_1 | org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213) taskmanager_1 | org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) taskmanager_1 | org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169) taskmanager_1 | org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286) taskmanager_1 | org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426) taskmanager_1 | org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) taskmanager_1 | org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) taskmanager_1 | java.lang.Thread.run(Thread.java:748) Let me know if there is something else in which I can help. Regards On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <aljos...@apache.org> wrote: > There's multiple things that come together here, I'm afraid: > > 1. There is additional output when stopping with a savepoint. It would > be good to know where that comes from. > > 2. The state internals implementation does in fact seem wrong. We don't > differentiate the cases of "never created an accumulator" and "my > accumulator is null". > > @Andrés, could you put breakpoints in your Combiner implementation and > see when that second output happens and why it happens (a stacktrace > would help, probably) > > Regarding the state internals: we would basically need to introduce one > more layer, instead of keeping an AccumT we need to keep an > Option<AccumT> or something of that sort. Nnot saying Java Optional > here, on purpose. However, changing the state type would have the > consequence that savepoints are no longer compatible, i.e. you cannot > restore a job from before this change using a Beam version after this > change. So I'm very reluctant. > > > On 15.10.20 11:51, Andrés Garagiola wrote: > > Thanks Luke, Aljoscha > > > > Let me know if I can help you to reproduce the problem. > > In my case the state is never set to null but I think that it becomes > null > > while the job is stopping. Once I run the job again from the savepoint, > the > > state is recovered normally. > > > > Let's show this with an example: > > > > t0: Add input 1 => accu state [1] => output [1] > > t1: Add input 2 => acu state [1,2] => output [1,2] > > t2: stop job with savepoint => output [1,2,3] and *output [] * > > t3: run job from savepoint => acu state [1,2] => no output > > t4: Add input 3 => acu state [1,2,3] => [1,2,3] > > > > Regards > > > > On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> I'll take a look. > >> > >> On 14.10.20 18:49, Luke Cwik wrote: > >>> Assuming that null means that the accumulator was never created is not > >>> right especially if null is a valid terminal state while the > >>> initial accumulator value is non-null. This is uncommon but possible. > >> Filed > >>> https://issues.apache.org/jira/browse/BEAM-11063. > >>> > >>> +Aljoscha Krettek <aljos...@apache.org> Is this something you can > take a > >>> look at? > >>> > >>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola < > >> andresgaragi...@gmail.com> > >>> wrote: > >>> > >>>> Hi all, > >>>> > >>>> I have this problem in a stream pipeline using the runner Apache Flink > >>>> 1.19. I want to do an upgrade to my job. I first end the job by using > >> the > >>>> Flink API creating a savepoint, and then I start the new version by > >> using > >>>> the Flink API passing the savepoint path. > >>>> > >>>> When the job ends two new records are created. The first one is OK but > >> the > >>>> second one is an empty record. > >>>> > >>>> > >>>> My pipeline uses this window strategy: > >>>> > >>>> > >>>> *Window<KV<String, TaggedEvent>> window =* > >>>> > >>>> * Window.<KV<String, > >>>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))* > >>>> > >>>> * .triggering(AfterWatermark.pastEndOfWindow()* > >>>> > >>>> * > >>>> > >> > > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))* > >>>> > >>>> * > >>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))* > >>>> > >>>> * > >>>> > >> > .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))* > >>>> > >>>> * .accumulatingFiredPanes();* > >>>> > >>>> > >>>> I implemented a custom combiner, and I realized that the state of the > >>>> combiner is null in the second output. This line ( > >>>> > >> > https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507 > >> ) > >>>> is evaluated to false, and then it creates an empty accumulator. > >>>> > >>>> > >>>> Is this the expected behavior? > >>>> > >>>> > >>>> Thanks > >>>> > >>> > >> > >> > > > >