Reported a tracking JIRA:  https://issues.apache.org/jira/browse/BEAM-8645

On Tue, Nov 12, 2019 at 9:48 AM Ruoyun Huang <ruo...@google.com> wrote:

> Thanks for confirming.
>
> Since it is unexpected behavior, I shall look into jira if it is already
> on radar, if not, will create one.
>
> On Mon, Nov 11, 2019 at 6:11 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> The END_OF_WINDOW is indeed 9.999999 (or, in Java, 9.999000), but the
>> results for LATEST and EARLIEST should be 9 and 0 respectively.
>>
>> On Mon, Nov 11, 2019 at 5:34 PM Ruoyun Huang <ruo...@google.com> wrote:
>> >
>> > Hi, Folks,
>> >
>> >     I am trying to understand the behavior of TimestampCombiner. I have
>> a test like this:
>> >
>> > class TimestampCombinerTest(unittest.TestCase):
>> >
>> >   def test_combiner_latest(self):
>> >     """Test TimestampCombiner with LATEST."""
>> >     options = PipelineOptions()
>> >     options.view_as(StandardOptions).streaming = True
>> >     p = TestPipeline(options=options)
>> >
>> >     main_stream = (p
>> >                    | 'main TestStream' >> TestStream()
>> >                    .add_elements([window.TimestampedValue(('k', 100),
>> 0)])
>> >                    .add_elements([window.TimestampedValue(('k', 400),
>> 9)])
>> >                    .advance_watermark_to_infinity()
>> >                    | 'main windowInto' >> beam.WindowInto(
>> >                       window.FixedWindows(10),
>> >
>>  timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
>> >                    | 'Combine' >> beam.CombinePerKey(sum))
>> >
>> >     class RecordFn(beam.DoFn):
>> >       def process(self,
>> >                   elm=beam.DoFn.ElementParam,
>> >                   ts=beam.DoFn.TimestampParam):
>> >         yield (elm, ts)
>> >
>> >     records = (main_stream | beam.ParDo(RecordFn()))
>> >
>> >     expected_window_to_elements = {
>> >         window.IntervalWindow(0, 10): [
>> >             (('k', 500),  Timestamp(9)),
>> >         ],
>> >     }
>> >
>> >     assert_that(
>> >         records,
>> >         equal_to_per_window(expected_window_to_elements),
>> >         use_global_window=False,
>> >         label='assert per window')
>> >
>> >     p.run()
>> >
>> >
>> > I expect the result to be following (based on various TimestampCombiner
>> strategy):
>> > LATEST:    (('k', 500), Timestamp(9)),
>> > EARLIEST:    (('k', 500), Timestamp(0)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(10)),
>> >
>> > The above outcome is partially confirmed by Java side test : [1]
>> >
>> >
>> > However, from beam python, the outcome is like this:
>> > LATEST:    (('k', 500), Timestamp(10)),
>> > EARLIEST:    (('k', 500), Timestamp(10)),
>> > END_OF_WINDOW: (('k', 500), Timestamp(9.99999999)),
>> >
>> > What did I miss? what should be the right expected behavior? or this
>> looks like a bug?
>> >
>> > [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java#L390
>> >
>> > Cheers,
>> >
>>
>
>
> --
> ================
> Ruoyun  Huang
>
>

-- 
================
Ruoyun  Huang

Reply via email to