Thanks for confirming. Since it is unexpected behavior, I shall look into jira if it is already on radar, if not, will create one.
On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <[email protected]> wrote: > The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the > results for LATEST and EARLIEST should be 9 and 0 respectively. > > On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <[email protected]> wrote: > > > > Hi, Folks, > > > > I am trying to understand the behavior of TimestampCombiner. I have > a test like this: > > > > class TimestampCombinerTest(unittest.TestCase): > > > > def test_combiner_latest(self): > > """Test TimestampCombiner with LATEST.""" > > options = PipelineOptions() > > options.view_as(StandardOptions).streaming = True > > p = TestPipeline(options=options) > > > > main_stream = (p > > | 'main TestStream' >> TestStream() > > .add_elements([window.TimestampedValue(('k', 100), > 0)]) > > .add_elements([window.TimestampedValue(('k', 400), > 9)]) > > .advance_watermark_to_infinity() > > | 'main windowInto' >> beam.WindowInto( > > window.FixedWindows(10), > > > timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST) > > | 'Combine' >> beam.CombinePerKey(sum)) > > > > class RecordFn(beam.DoFn): > > def process(self, > > elm=beam.DoFn.ElementParam, > > ts=beam.DoFn.TimestampParam): > > yield (elm, ts) > > > > records = (main_stream | beam.ParDo(RecordFn())) > > > > expected_window_to_elements = { > > window.IntervalWindow(0, 10): [ > > (('k', 500), Timestamp(9)), > > ], > > } > > > > assert_that( > > records, > > equal_to_per_window(expected_window_to_elements), > > use_global_window=False, > > label='assert per window') > > > > p.run() > > > > > > I expect the result to be following (based on various TimestampCombiner > strategy): > > LATEST: (('k', 500), Timestamp(9)), > > EARLIEST: (('k', 500), Timestamp(0)), > > END_OF_WINDOW: (('k', 500), Timestamp(10)), > > > > The above outcome is partially confirmed by Java side test : [1] > > > > > > However, from beam python, the outcome is like this: > > LATEST: (('k', 500), Timestamp(10)), > > EARLIEST: (('k', 500), Timestamp(10)), > > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)), > > > > What did I miss? what should be the right expected behavior? or this > looks like a bug? > > > > [1]: > https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390 > > > > Cheers, > > > -- ================ Ruoyun Huang
