I bet, as with the previous one, this is due to over-eager combiner lifting.
On Tue, Nov 12, 2019 at 4:17 PM Ruoyun Huang <[email protected]> wrote: > > Reported a tracking JIRA: https://issues.apache.org/jira/browse/BEAM-8645 > > On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <[email protected]> wrote: >> >> Thanks for confirming. >> >> Since it is unexpected behavior, I shall look into jira if it is already on >> radar, if not, will create one. >> >> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <[email protected]> wrote: >>> >>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the >>> results for LATEST and EARLIEST should be 9 and 0 respectively. >>> >>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <[email protected]> wrote: >>> > >>> > Hi, Folks, >>> > >>> > I am trying to understand the behavior of TimestampCombiner. I have a >>> > test like this: >>> > >>> > class TimestampCombinerTest(unittest.TestCase): >>> > >>> > def test_combiner_latest(self): >>> > """Test TimestampCombiner with LATEST.""" >>> > options = PipelineOptions() >>> > options.view_as(StandardOptions).streaming = True >>> > p = TestPipeline(options=options) >>> > >>> > main_stream = (p >>> > | 'main TestStream' >> TestStream() >>> > .add_elements([window.TimestampedValue(('k', 100), 0)]) >>> > .add_elements([window.TimestampedValue(('k', 400), 9)]) >>> > .advance_watermark_to_infinity() >>> > | 'main windowInto' >> beam.WindowInto( >>> > window.FixedWindows(10), >>> > >>> > timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST) >>> > | 'Combine' >> beam.CombinePerKey(sum)) >>> > >>> > class RecordFn(beam.DoFn): >>> > def process(self, >>> > elm=beam.DoFn.ElementParam, >>> > ts=beam.DoFn.TimestampParam): >>> > yield (elm, ts) >>> > >>> > records = (main_stream | beam.ParDo(RecordFn())) >>> > >>> > expected_window_to_elements = { >>> > window.IntervalWindow(0, 10): [ >>> > (('k', 500), Timestamp(9)), >>> > ], >>> > } >>> > >>> > assert_that( >>> > records, >>> > equal_to_per_window(expected_window_to_elements), >>> > use_global_window=False, >>> > label='assert per window') >>> > >>> > p.run() >>> > >>> > >>> > I expect the result to be following (based on various TimestampCombiner >>> > strategy): >>> > LATEST: (('k', 500), Timestamp(9)), >>> > EARLIEST: (('k', 500), Timestamp(0)), >>> > END_OF_WINDOW: (('k', 500), Timestamp(10)), >>> > >>> > The above outcome is partially confirmed by Java side test : [1] >>> > >>> > >>> > However, from beam python, the outcome is like this: >>> > LATEST: (('k', 500), Timestamp(10)), >>> > EARLIEST: (('k', 500), Timestamp(10)), >>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)), >>> > >>> > What did I miss? what should be the right expected behavior? or this >>> > looks like a bug? >>> > >>> > [1]: >>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390 >>> > >>> > Cheers, >>> > >> >> >> >> -- >> ================ >> Ruoyun Huang >> > > > -- > ================ > Ruoyun Huang >
