Exact, since the new tests use Unbounded PCollections we have to add the
UsesUnboundedPCollections category.
Also the Flink runner was not excluding this category for the batch
(bounded) tests.
I opened this one to fix it, PTAL Reuven
https://github.com/apache/beam/pull/10871

On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <[email protected]> wrote:

> This is running as part of the validatesRunnerBatch test, but it is
> executing a streaming test. Maybe that's why it's failing?
>
> On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <[email protected]> wrote:
>
>> Ismael,
>>
>> As part of that fix I added some new tests to make sure to run these
>> tests on both batch and streaming runners, as I realized that the test was
>> running only on batch runners before.  I did this by explicitly setting the
>> isBounded attribute on the output of the Create transform. Somehow these
>> new tests I added are making the Flink runner unhappy.
>>
>> I'm not sure why explicitly setting the PCollection to be unbounded is
>> breaking on the Flink runner. We can try and exclude the flink runner from
>> these tests for now, but maybe Max has an idea.
>>
>> Also Ismael, what makes this a batch mode test? When I look at the
>> failing stack trace, the failure is in FlinkStreamingPipelineTranslator.
>>
>> Reuven
>>
>> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <[email protected]> wrote:
>>
>>> Apparently the fix of Dynamic Timers for Dataflow broke the
>>> ValidatesRunner tests for Flink in batch mode that were passing before.
>>> Can you please take a look Reuven or Rehman.
>>> Tests are failing since the exact commit for the fix:
>>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>>> For details on the exception:
>>>
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>>
>>>
>>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <[email protected]> wrote:
>>>
>>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>>> feature it
>>>> looks great!
>>>>
>>>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>>>> still
>>>> needed to support this in portable runners.
>>>>
>>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <[email protected]>
>>>> wrote:
>>>>
>>>>> I think the (lack of) portability bit may have been buried in this
>>>>> thread. Maybe a new thread about the design for that?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>>>> that runners that don't support this feature will reject the pipeline.
>>>>>>
>>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>>
>>>>>>> A larger question is how to support this in the portability layer.
>>>>>>> Right now portability assumes that each timer id corresponds to a 
>>>>>>> logical
>>>>>>> input PCollection, but that assumption no longer works as we now 
>>>>>>> support a
>>>>>>> dynamic set of timers, each with their own id. We could instead model 
>>>>>>> each
>>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>>> dynamically get the timer id in order to invoke it, and today it 
>>>>>>> statically
>>>>>>> reads the timer id from the PCollection name.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>>> tests indeed never ran on any runner except for the DirectRunner, 
>>>>>>>> which is
>>>>>>>> something I should've noticed in the code review.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I had a discussion with Rehman last week and we discovered that
>>>>>>>>> the TimersMap
>>>>>>>>> related tests were not running for all runners because they were
>>>>>>>>> not tagged as
>>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>>> this, so
>>>>>>>>> please someone help me with the review/merge.
>>>>>>>>>
>>>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>>>> passing for
>>>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>>>> are not
>>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>>> probably worth
>>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>>
>>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>>> [2]
>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>>> [3]
>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>>> be fairly trivial.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>>> before we
>>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>>> invocations.
>>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed
>>>>>>>>>>> this. Flink
>>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>>> In order
>>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>>> checkpointed
>>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>>
>>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>>> supported
>>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>>> into
>>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>>> TimerReceiver
>>>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>>>> >
>>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>>> handles
>>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>>> provide an
>>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>>> >
>>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>>> timer
>>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>>> the same
>>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>>> runners
>>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>>> the same id
>>>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>>>> updated
>>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>>> >
>>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated!
>>>>>>>>>>> I wonder
>>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>>> implementing the
>>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>>>> with the
>>>>>>>>>>> > same id?
>>>>>>>>>>> >
>>>>>>>>>>> > Reuven
>>>>>>>>>>> >
>>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <
>>>>>>>>>>> [email protected]
>>>>>>>>>>> > <mailto:[email protected]>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>>> >
>>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>>> at all).
>>>>>>>>>>> >
>>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>>> runners?
>>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>>> >
>>>>>>>>>>> >     Maybe worth to add this to the website Compatibility
>>>>>>>>>>> Matrix.
>>>>>>>>>>> >
>>>>>>>>>>> >     Regards,
>>>>>>>>>>> >     Ismaël
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>>> >     <[email protected]
>>>>>>>>>>> >     <mailto:[email protected]>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>>> development
>>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>>> the Beam
>>>>>>>>>>> >         project.
>>>>>>>>>>> >
>>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>>> >
>>>>>>>>>>> >         *
>>>>>>>>>>> >         *
>>>>>>>>>>> >
>>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>>> >         Software Engineer
>>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>>> [email protected]
>>>>>>>>>>> >         <mailto:[email protected]>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>>> supports
>>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>>> on the dev
>>>>>>>>>>> >             list some time back.
>>>>>>>>>>> >
>>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>>> declare all
>>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>>> timers,
>>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>>> callbacks -
>>>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>>>> stumbling
>>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>>> (often
>>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>>> Beam. The
>>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>>> >
>>>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>>>> is a map
>>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>>> name, so you
>>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>>> callback.
>>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>>> >
>>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>>> >
>>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>>> TimerMap
>>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>>> >                 }
>>>>>>>>>>> >
>>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>>> timerId);
>>>>>>>>>>> >                }
>>>>>>>>>>> >             }
>>>>>>>>>>> >
>>>>>>>>>>> >             This currently works for the Flink and the
>>>>>>>>>>> Dataflow runners.
>>>>>>>>>>> >
>>>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>>>> will find
>>>>>>>>>>> >             it very valuable.
>>>>>>>>>>> >
>>>>>>>>>>> >             Reuven
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to