[ 
https://issues.apache.org/jira/browse/BEAM-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768916#comment-16768916
 ] 

Thomas Weise commented on BEAM-6676:
------------------------------------

{code:java}
from __future__ import absolute_import

import logging
import sys

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import userstate


class StatefulFn(beam.DoFn):
  count_state_spec = userstate.CombiningValueStateSpec(
        'count', beam.coders.IterableCoder(beam.coders.VarIntCoder()), sum)
  timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)

  def process(self, kv, count=beam.DoFn.StateParam(count_state_spec), 
timer=beam.DoFn.TimerParam(timer_spec), window=beam.DoFn.WindowParam):
    count.add(1)
    timer.set(1000)
    logging.info("###count %s, %d, window %s" % (kv, count.read(), window))

  @userstate.on_timer(timer_spec)
  def process_timer(self, count=beam.DoFn.StateParam(count_state_spec), 
window=beam.DoFn.WindowParam):
    logging.info("###count %d, window %s" % (count.read(), window))

def run(argv=None):
  """Build and run the pipeline."""
  args = ["--runner=PortableRunner",
          "--job_endpoint=localhost:8099",
          "--streaming",
          "--shutdown_sources_on_final_watermark"]
  if argv:
    args.extend(argv)

  pipeline_options = PipelineOptions(args)

  p = beam.Pipeline(options=pipeline_options)

  _ = (p
       | beam.Create([('k1', 1), ('k1', 2)])
       | 'window' >> beam.WindowInto(window.FixedWindows(5))
       | 'statefulCount' >> beam.ParDo(StatefulFn())
       )

  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run(sys.argv[1:])
{code}

> Python timers only working with GlobalWindow
> --------------------------------------------
>
>                 Key: BEAM-6676
>                 URL: https://issues.apache.org/jira/browse/BEAM-6676
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>    Affects Versions: 2.10.0
>            Reporter: Thomas Weise
>            Priority: Major
>
> Setting a timer with the Py SDK fails with fixed window (portable runner).
> Test case attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to