[
https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-7995:
-------------------------------
Status: Open (was: Triage Needed)
> IllegalStateException: TimestampCombiner moved element from to earlier time
> in Python
> -------------------------------------------------------------------------------------
>
> Key: BEAM-7995
> URL: https://issues.apache.org/jira/browse/BEAM-7995
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Hai Lu
> Assignee: Hai Lu
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> I'm looking into a bug I found internally when using Beam portable API
> (Python) on our own Samza runner.
>
> The pipeline looks something like this:
>
> (p
> | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
> | 'transform' >> beam.Map(lambda event: process_event(event))
> | 'window' >> beam.WindowInto(FixedWindows(15))
> | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
> ...
>
> The problem comes from the combiners which cause the following exception on
> Java side:
>
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element
> from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z
> for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
> at
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
> at
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
> at
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> The exception happens here
> [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116]
> when we check the shifted timestamp to ensure it's before the timestamp.
>
> if (shifted.isBefore(timestamp)) {
> throw new IllegalStateException(
> String.format(
> "TimestampCombiner moved element from %s to earlier time %s for
> window %s",
> BoundedWindow.formatTimestamp(timestamp),
> BoundedWindow.formatTimestamp(shifted),
> window));
> }
>
> As you can see from the exception, the "shifted" is "XXX 44.999" while the
> "timestamp" is "XXX 45.000". The "44.999" is coming from
> [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
>
> @Override
> public Instant merge(BoundedWindow intoWindow, Iterable<? extends
> Instant> mergingTimestamps) {
> return intoWindow.maxTimestamp();
> }
>
> where intoWindow.maxTimestamp() is:
>
> /** Returns the largest timestamp that can be included in this window. */
> @Override
> public Instant maxTimestamp() {
> *// end not inclusive*
> return *end.minus(1)*;
> }
>
> Hence, the "44.*999*".
>
> And the "45.000" comes from the Python side when the combiner output results
> as pre GBK operation:
> [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
>
> if windows is 0:
> self.output(_globally_windowed_value.with_value((key, value)))
> else:
> self.output(WindowedValue((key, value), *windows[0].end*, windows))
>
> Here when we generate the window value, the timestamp is assigned to the
> closed interval end (45.000) as opposed to open interval end (44.999)
>
> Clearly the "end of window" definition is a bit inconsistent across Python
> and Java. I'm yet to try this on other runner so not sure whether this is
> only an issue for our Samza runner. I tend to think this is a bug but would
> like to confirm with you. If this has not been an issue for other runners,
> where did I potentially do wrong.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)