Reported a tracking JIRA: https://issues.apache.org/jira/browse/BEAM-8645
On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ruo...@google.com> wrote: > Thanks for confirming. > > Since it is unexpected behavior, I shall look into jira if it is already > on radar, if not, will create one. > > On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <rober...@google.com> > wrote: > >> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the >> results for LATEST and EARLIEST should be 9 and 0 respectively. >> >> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ruo...@google.com> wrote: >> > >> > Hi, Folks, >> > >> > I am trying to understand the behavior of TimestampCombiner. I have >> a test like this: >> > >> > class TimestampCombinerTest(unittest.TestCase): >> > >> > def test_combiner_latest(self): >> > """Test TimestampCombiner with LATEST.""" >> > options = PipelineOptions() >> > options.view_as(StandardOptions).streaming = True >> > p = TestPipeline(options=options) >> > >> > main_stream = (p >> > | 'main TestStream' >> TestStream() >> > .add_elements([window.TimestampedValue(('k', 100), >> 0)]) >> > .add_elements([window.TimestampedValue(('k', 400), >> 9)]) >> > .advance_watermark_to_infinity() >> > | 'main windowInto' >> beam.WindowInto( >> > window.FixedWindows(10), >> > >> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST) >> > | 'Combine' >> beam.CombinePerKey(sum)) >> > >> > class RecordFn(beam.DoFn): >> > def process(self, >> > elm=beam.DoFn.ElementParam, >> > ts=beam.DoFn.TimestampParam): >> > yield (elm, ts) >> > >> > records = (main_stream | beam.ParDo(RecordFn())) >> > >> > expected_window_to_elements = { >> > window.IntervalWindow(0, 10): [ >> > (('k', 500), Timestamp(9)), >> > ], >> > } >> > >> > assert_that( >> > records, >> > equal_to_per_window(expected_window_to_elements), >> > use_global_window=False, >> > label='assert per window') >> > >> > p.run() >> > >> > >> > I expect the result to be following (based on various TimestampCombiner >> strategy): >> > LATEST: (('k', 500), Timestamp(9)), >> > EARLIEST: (('k', 500), Timestamp(0)), >> > END_OF_WINDOW: (('k', 500), Timestamp(10)), >> > >> > The above outcome is partially confirmed by Java side test : [1] >> > >> > >> > However, from beam python, the outcome is like this: >> > LATEST: (('k', 500), Timestamp(10)), >> > EARLIEST: (('k', 500), Timestamp(10)), >> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)), >> > >> > What did I miss? what should be the right expected behavior? or this >> looks like a bug? >> > >> > [1]: >> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390 >> > >> > Cheers, >> > >> > > > -- > ================ > Ruoyun Huang > > -- ================ Ruoyun Huang