[ https://issues.apache.org/jira/browse/BEAM-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768916#comment-16768916 ]
Thomas Weise commented on BEAM-6676: ------------------------------------ {code:java} from __future__ import absolute_import import logging import sys import apache_beam as beam import apache_beam.transforms.window as window from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import userstate class StatefulFn(beam.DoFn): count_state_spec = userstate.CombiningValueStateSpec( 'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum) timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK) def process(self, kv, count=beam.DoFn.StateParam(count_state_spec), timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam): count.add(1) timer.set(1000) logging.info("###count %s, %d, window %s" % (kv, count.read(), window)) @userstate.on_timer(timer_spec) def process_timer(self, count=beam.DoFn.StateParam(count_state_spec), window=beam.DoFn.WindowParam): logging.info("###count %d, window %s" % (count.read(), window)) def run(argv=None): """Build and run the pipeline.""" args = ["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming", "--shutdown_sources_on_final_watermark"] if argv: args.extend(argv) pipeline_options = PipelineOptions(args) p = beam.Pipeline(options=pipeline_options) _ = (p | beam.Create([('k1', 1), ('k1', 2)]) | 'window' >> beam.WindowInto(window.FixedWindows(5)) | 'statefulCount' >> beam.ParDo(StatefulFn()) ) result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run(sys.argv[1:]) {code} > Python timers only working with GlobalWindow > -------------------------------------------- > > Key: BEAM-6676 > URL: https://issues.apache.org/jira/browse/BEAM-6676 > Project: Beam > Issue Type: Bug > Components: sdk-py-harness > Affects Versions: 2.10.0 > Reporter: Thomas Weise > Priority: Major > > Setting a timer with the Py SDK fails with fixed window (portable runner). > Test case attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)