The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
results for LATEST and EARLIEST should be 9 and 0 respectively.

On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <[email protected]> wrote:
>
> Hi, Folks,
>
>     I am trying to understand the behavior of TimestampCombiner. I have a 
> test like this:
>
> class TimestampCombinerTest(unittest.TestCase):
>
>   def test_combiner_latest(self):
>     """Test TimestampCombiner with LATEST."""
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     p = TestPipeline(options=options)
>
>     main_stream = (p
>                    | 'main TestStream' >> TestStream()
>                    .add_elements([window.TimestampedValue(('k', 100), 0)])
>                    .add_elements([window.TimestampedValue(('k', 400), 9)])
>                    .advance_watermark_to_infinity()
>                    | 'main windowInto' >> beam.WindowInto(
>                       window.FixedWindows(10),
>                       timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>                    | 'Combine' >> beam.CombinePerKey(sum))
>
>     class RecordFn(beam.DoFn):
>       def process(self,
>                   elm=beam.DoFn.ElementParam,
>                   ts=beam.DoFn.TimestampParam):
>         yield (elm, ts)
>
>     records = (main_stream | beam.ParDo(RecordFn()))
>
>     expected_window_to_elements = {
>         window.IntervalWindow(0, 10): [
>             (('k', 500),  Timestamp(9)),
>         ],
>     }
>
>     assert_that(
>         records,
>         equal_to_per_window(expected_window_to_elements),
>         use_global_window=False,
>         label='assert per window')
>
>     p.run()
>
>
> I expect the result to be following (based on various TimestampCombiner 
> strategy):
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
>
> The above outcome is partially confirmed by Java side test : [1]
>
>
> However, from beam python, the outcome is like this:
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>
> What did I miss? what should be the right expected behavior? or this looks 
> like a bug?
>
> [1]: 
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>
> Cheers,
>

Reply via email to