Hi Pablo,

I'm not using TestStream, the test uses Create, but changing the input to TestStream.add_elements(...).advance_watermark_to_infinity() seems not to have any impact on the outcome. The outputs are still produced and then dropped. The pipeline logic is pretty much simple - buffer data for defined amount of processing time then flush it. The event-time timer is to enforce flushing of the buffer at the end of global window (python seems to be missing @OnWindowExpiration as of now).

Regarding the runner, I'm using the default which is created using (beam 2.32.0)

   beam.Pipeline(options=PipelineOptions(["--streaming"]))

With this the test seems to be 100% failing. Changing the above to

   beam.Pipeline(options=PipelineOptions(["--streaming", "--runner=flink"]))

the test seems to be 100% passing.

 Jan

On 9/13/21 7:58 PM, Pablo Estrada wrote:
Hi Jan,
are you using a TestStream? Do you know which directrunner implementation is being used? (FnApiRunner or BundleBasedDirectRunner?)
-P.

On Mon, Sep 13, 2021 at 10:45 AM Ahmet Altay <[email protected] <mailto:[email protected]>> wrote:

    +Pablo Estrada <mailto:[email protected]> might know the answer.

    On Fri, Sep 10, 2021 at 8:46 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        I didn't dig into the code yet, just wanted to ask if someone
        could give
        me some pointers. I'm experiencing weird behavior in python
        SDK, when
        emitting data from stateful DoFn with event-time timer set on
        GlobalWindow().max_timestamp(). The data is correctly emitted,
        but is
        probably delayed after watermark and is dropped on first GBK
        downstream
        (assert_that in my case), the assertion then fails. When I use
        FlinkRunner it seems to work as expected (the test passes).

        Did anyone see something like this before?

        Thanks,

          Jan

Reply via email to