I believe that this thread is entirely related to an another thread[1]
where there is discussion that the correct fix for this issue could be to
enforce that watermark updates would only happen at bundle boundaries. There’s
another related thread[2] citing the same error with ElasticsearchIO.

This appears to be the same root cause where multiple entities from
different windows end up in the same bundle, and the watermark updates
between elements of that bundle causing some timestamps to be “invalid”.

[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
[2]
https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db

- Evan

On Fri, Apr 1, 2022 at 18:35 Steve Niemitz <sniem...@apache.org> wrote:

> > but this only impacts non-portable runners.
>
> heh like Dataflow? :(
>
> I'm not sure what the solution is here then?  I managed to hit this bug
> within 2 hours of running my first pipeline on 2.37.  I can't just live
> with pipelines breaking randomly, and it seems like everything worked fine
> before that check was introduced (even if the output timestamp was
> technically wrong?).  Is the answer to add a 1ms allowed skew to all my
> DoFns that use processing time timers?  Remove the check again? (which is
> probably what I'll do in the short term in our fork)
>
> When you say "There is another bug about the wrong output timestamp being
> used for processing time timers", which timestamp is that that's
> incorrect?  The element input timestamp?  What should it be instead?
>
> On Fri, Apr 1, 2022 at 6:09 PM Luke Cwik <lc...@google.com> wrote:
>
>> The longstanding issue was that this was always happening but was not
>> visible, recently validation was added to make it visible that this was
>> wrong[1].
>>
>> There is another bug about the wrong output timestamp being used for
>> processing time timers that exacerbates this but this only impacts
>> non-portable runners.
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-12931
>>
>>
>> On Fri, Apr 1, 2022 at 2:17 PM Steve Niemitz <sniem...@apache.org> wrote:
>>
>>> I'm unclear how the timer would have even ever been set to output at
>>> that timestamp though.  The output timestamp falls into the next window,
>>> and if unset (like in this case) the output timestamp is derived from the
>>> element timestamp [1].  This means we somehow had an element in the wrong
>>> window?  This seems strangely similar to BEAM-6757 actually.
>>>
>>> What long-standing bug were you talking about here?  We've been running
>>> these pipelines for years now and never run into this until now, although
>>> maybe we had been, but there was no validation in the past to catch it
>>> until now?
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1284
>>>
>>>
>>> On Fri, Apr 1, 2022 at 5:11 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> There is a long-standing bug with processing timestamps.
>>>>
>>>> On Fri, Apr 1, 2022 at 2:01 PM Steve Niemitz <sniem...@twitter.com>
>>>> wrote:
>>>>
>>>>> We have a job that uses processing time timers, and just upgraded from
>>>>> 2.33 to 2.37.  Sporadically we've started seeing jobs fail with this 
>>>>> error:
>>>>>
>>>>> java.lang.IllegalArgumentException: Cannot output with timestamp
>>>>> 2022-04-01T19:19:59.999Z. Output timestamps must be no earlier than the
>>>>> output timestamp of the timer (2022-04-01T19:20:00.000Z) minus the allowed
>>>>> skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
>>>>> the
>>>>> DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed
>>>>> skew.
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.checkTimestamp(SimpleDoFnRunner.java:883)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.outputWithTimestamp(SimpleDoFnRunner.java:863)
>>>>> at
>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
>>>>> <user code>
>>>>>
>>>>> This windowing is configured with 10 minute fixed windows and 10
>>>>> minute allowed lateness.  We're not specifically setting the output time 
>>>>> on
>>>>> the timer, so it seems like it's getting inferred from the element
>>>>> timestamp?  The code that emits elements from the timer uses
>>>>> window.maxTimestamp() to set the output timestamp.  I'm not sure I
>>>>> understand how an element with a timestamp in what should be the next
>>>>> window ended up in the previous one?  Given that this is the first 
>>>>> stateful
>>>>> operation in the pipeline and we read from pubsub using pubsub timestamps,
>>>>> so there should be no late data.
>>>>>
>>>>> I know there was a change recently to better validate the output
>>>>> timestamp from timers [1], I'm having trouble understanding if there's a
>>>>> bug in that, or if this is actually exposing a real bug in our pipeline.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/commit/15048929495ad66963b528d5bd71eb7b4a844c96
>>>>>
>>>>

Reply via email to