The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the results for LATEST and EARLIEST should be 9 and 0 respectively.
On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <[email protected]> wrote: > > Hi, Folks, > > I am trying to understand the behavior of TimestampCombiner. I have a > test like this: > > class TimestampCombinerTest(unittest.TestCase): > > def test_combiner_latest(self): > """Test TimestampCombiner with LATEST.""" > options = PipelineOptions() > options.view_as(StandardOptions).streaming = True > p = TestPipeline(options=options) > > main_stream = (p > | 'main TestStream' >> TestStream() > .add_elements([window.TimestampedValue(('k', 100), 0)]) > .add_elements([window.TimestampedValue(('k', 400), 9)]) > .advance_watermark_to_infinity() > | 'main windowInto' >> beam.WindowInto( > window.FixedWindows(10), > timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST) > | 'Combine' >> beam.CombinePerKey(sum)) > > class RecordFn(beam.DoFn): > def process(self, > elm=beam.DoFn.ElementParam, > ts=beam.DoFn.TimestampParam): > yield (elm, ts) > > records = (main_stream | beam.ParDo(RecordFn())) > > expected_window_to_elements = { > window.IntervalWindow(0, 10): [ > (('k', 500), Timestamp(9)), > ], > } > > assert_that( > records, > equal_to_per_window(expected_window_to_elements), > use_global_window=False, > label='assert per window') > > p.run() > > > I expect the result to be following (based on various TimestampCombiner > strategy): > LATEST: (('k', 500), Timestamp(9)), > EARLIEST: (('k', 500), Timestamp(0)), > END_OF_WINDOW: (('k', 500), Timestamp(10)), > > The above outcome is partially confirmed by Java side test : [1] > > > However, from beam python, the outcome is like this: > LATEST: (('k', 500), Timestamp(10)), > EARLIEST: (('k', 500), Timestamp(10)), > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)), > > What did I miss? what should be the right expected behavior? or this looks > like a bug? > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390 > > Cheers, >
