Hi Aljoscha,

I printed the stack trace in the createAccumulator() method of my combinar:

taskmanager_1  | java.lang.Thread.getStackTrace(Thread.java:1559)
taskmanager_1  |
com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:29)
taskmanager_1  |
com.test.sensor.beam.activity.SensorMessumentCombinerFn.createAccumulator(SensorMessumentCombinerFn.java:20)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.read(FlinkStateInternals.java:510)
taskmanager_1  |
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127)
taskmanager_1  |
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1059)
taskmanager_1  |
org.apache.beam.runners.core.ReduceFnRunner.onTimers(ReduceFnRunner.java:768)
taskmanager_1  |
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:137)
taskmanager_1  |
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
taskmanager_1  |
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
taskmanager_1  |
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
taskmanager_1  |
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
taskmanager_1  |
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.fireTimer(WindowDoFnOperator.java:128)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:924)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onEventTime(DoFnOperator.java:913)
taskmanager_1  |
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
taskmanager_1  |
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark1(DoFnOperator.java:702)
taskmanager_1  |
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processWatermark(DoFnOperator.java:681)
taskmanager_1  |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
taskmanager_1  |
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
taskmanager_1  |
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
taskmanager_1  |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
taskmanager_1  |
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
taskmanager_1  |
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
taskmanager_1  |
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:321)
taskmanager_1  |
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndHandleCancel(StreamTask.java:286)
taskmanager_1  |
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:426)
taskmanager_1  |
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
taskmanager_1  |
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
taskmanager_1  | java.lang.Thread.run(Thread.java:748)

Let me know if there is something else in which I can help.

Regards

On Thu, Oct 15, 2020 at 8:34 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> There's multiple things that come together here, I'm afraid:
>
> 1. There is additional output when stopping with a savepoint. It would
> be good to know where that comes from.
>
> 2. The state internals implementation does in fact seem wrong. We don't
> differentiate the cases of "never created an accumulator" and "my
> accumulator is null".
>
> @Andrés, could you put breakpoints in your Combiner implementation and
> see when that second output happens and why it happens (a stacktrace
> would help, probably)
>
> Regarding the state internals: we would basically need to introduce one
> more layer, instead of keeping an AccumT we need to keep an
> Option<AccumT> or something of that sort. Nnot saying Java Optional
> here, on purpose. However, changing the state type would have the
> consequence that savepoints are no longer compatible, i.e. you cannot
> restore a job from before this change using a Beam version after this
> change. So I'm very reluctant.
>
>
> On 15.10.20 11:51, Andrés Garagiola wrote:
> > Thanks Luke, Aljoscha
> >
> > Let me know if I can help you to reproduce the problem.
> > In my case the state is never set to null but I think that it becomes
> null
> > while the job is stopping. Once I run the job again from the savepoint,
> the
> > state is recovered normally.
> >
> > Let's show this with an example:
> >
> > t0: Add input 1 => accu state [1] => output [1]
> > t1: Add input 2 => acu state [1,2] => output [1,2]
> > t2: stop job with savepoint => output [1,2,3] and *output [] *
> > t3: run job from savepoint => acu state [1,2] => no output
> > t4: Add input 3 => acu state [1,2,3] => [1,2,3]
> >
> > Regards
> >
> > On Thu, Oct 15, 2020 at 11:33 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> >> I'll take a look.
> >>
> >> On 14.10.20 18:49, Luke Cwik wrote:
> >>> Assuming that null means that the accumulator was never created is not
> >>> right especially if null is a valid terminal state while the
> >>> initial accumulator value is non-null. This is uncommon but possible.
> >> Filed
> >>> https://issues.apache.org/jira/browse/BEAM-11063.
> >>>
> >>> +Aljoscha Krettek <aljos...@apache.org> Is this something you can
> take a
> >>> look at?
> >>>
> >>> On Wed, Oct 14, 2020 at 9:25 AM Andrés Garagiola <
> >> andresgaragi...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have this problem in a stream pipeline using the runner Apache Flink
> >>>> 1.19. I want to do an upgrade to my job. I first end the job by using
> >> the
> >>>> Flink API creating a savepoint, and then I start the new version by
> >> using
> >>>> the Flink API passing the savepoint path.
> >>>>
> >>>> When the job ends two new records are created. The first one is OK but
> >> the
> >>>> second one is an empty record.
> >>>>
> >>>>
> >>>> My pipeline uses this window strategy:
> >>>>
> >>>>
> >>>> *Window<KV<String, TaggedEvent>> window =*
> >>>>
> >>>> *    Window.<KV<String,
> >>>> TaggedEvent>>into(CalendarWindows.days(this.options.getWindowDays()))*
> >>>>
> >>>> *        .triggering(AfterWatermark.pastEndOfWindow()*
> >>>>
> >>>> *
> >>>>
> >>
>  
> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(delay))*
> >>>>
> >>>> *
> >>>>    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))*
> >>>>
> >>>> *
> >>>>
> >>
> .withAllowedLateness(Duration.standardSeconds(this.options.getAllowedLateness()))*
> >>>>
> >>>> *        .accumulatingFiredPanes();*
> >>>>
> >>>>
> >>>> I implemented a custom combiner, and I realized that the state of the
> >>>> combiner is null in the second output. This line (
> >>>>
> >>
> https://github.com/apache/beam/blob/v2.24.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L507
> >> )
> >>>> is evaluated to false, and then it creates an empty accumulator.
> >>>>
> >>>>
> >>>> Is this the expected behavior?
> >>>>
> >>>>
> >>>> Thanks
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to