[ https://issues.apache.org/jira/browse/BEAM-14127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-14127: ----------------------------------- Status: Open (was: Triage Needed) > Timers with same family ids in same stage (but different transforms) are > buffered together > ------------------------------------------------------------------------------------------ > > Key: BEAM-14127 > URL: https://issues.apache.org/jira/browse/BEAM-14127 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Pablo Estrada > Priority: P2 > > The following test case does not work properly: > > {code:java} > def test_dynamic_timer_clear_then_set_timer(self): > class EmitTwoEvents(DoFn): > EMIT_CLEAR_SET_TIMER = TimerSpec('emitclear', TimeDomain.WATERMARK) > def process(self, element, emit=DoFn.TimerParam(EMIT_CLEAR_SET_TIMER)): > yield ('1', 'set') > emit.set(1) > @on_timer(EMIT_CLEAR_SET_TIMER) > def emit_clear(self): > yield ('1', 'clear') > class DynamicTimerDoFn(DoFn): > EMIT_TIMER_FAMILY = TimerSpec('emit', TimeDomain.WATERMARK) > def process(self, element, emit=DoFn.TimerParam(EMIT_TIMER_FAMILY)): > if element[1] == 'set': > emit.set(10, dynamic_timer_tag='emit1') > emit.set(20, dynamic_timer_tag='emit2') > if element[1] == 'clear': > emit.set(30, dynamic_timer_tag='emit3') > emit.clear(dynamic_timer_tag='emit3') > emit.set(40, dynamic_timer_tag='emit3') > return [] > @on_timer(EMIT_TIMER_FAMILY) > def emit_callback( > self, ts=DoFn.TimestampParam, tag=DoFn.DynamicTimerTagParam): > yield (tag, ts) > with TestPipeline() as p: > res = ( > p > | beam.Create([('1', 'impulse')]) > | beam.ParDo(EmitTwoEvents()) > | beam.ParDo(DynamicTimerDoFn())) > assert_that(res, equal_to([('emit1', 10), ('emit2', 20), ('emit3', 40)]) > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)