[ 
https://issues.apache.org/jira/browse/BEAM-4594?focusedWorklogId=118458&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118458
 ]

ASF GitHub Bot logged work on BEAM-4594:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jul/18 22:23
            Start Date: 02/Jul/18 22:23
    Worklog Time Spent: 10m 
      Work Description: charlesccychen commented on a change in pull request 
#5691: [BEAM-4594] Beam Python state and timers user-facing API
URL: https://github.com/apache/beam/pull/5691#discussion_r199637833
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/userstate.py
 ##########
 @@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""User-facing interfaces for the Beam State and Timer APIs.
+
+Experimental; no backwards-compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+import logging
+
+from apache_beam.coders import Coder
+from apache_beam.transforms.timeutil import TimeDomain
+
+
+class StateSpec(object):
+  """Specification for a user DoFn state cell."""
+
+  def __init__(self):
+    raise NotImplementedError
+
+  def __repr__(self):
+    return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+class BagStateSpec(StateSpec):
+  """Specification for a user DoFn bag state cell."""
+
+  def __init__(self, name, coder):
+    assert isinstance(name, str)
+    assert isinstance(coder, Coder)
+    self.name = name
+    self.coder = coder
+
+
+class CombiningValueStateSpec(StateSpec):
+  """Specification for a user DoFn combining value state cell."""
+
+  def __init__(self, name, coder, combiner):
+    # Avoid circular import.
+    from apache_beam.transforms.core import CombineFn
+
+    assert isinstance(name, str)
+    assert isinstance(coder, Coder)
+    assert isinstance(combiner, CombineFn)
+    self.name = name
+    self.coder = coder
+    self.combiner = combiner
+
+
+class TimerSpec(object):
+  """Specification for a user stateful DoFn timer."""
+
+  def __init__(self, name, time_domain):
+    self.name = name
+    if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
+      raise ValueError('Unsupported TimeDomain: %r.' % (time_domain,))
+    self.time_domain = time_domain
+
+  def __repr__(self):
+    return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+# Attribute for attaching a TimerSpec to an on_timer method.
+_TIMER_SPEC_ATTRIBUTE_NAME = '_on_timer_spec'
+
+
+def on_timer(timer_spec):
+  """Decorator for timer firing DoFn method.
+
+  This decorator allows a user to specify an on_timer processing method
+  in a stateful DoFn.  Sample usage:
+
+  > class MyDoFn(DoFn):
+  >   TIMER_SPEC = TimerSpec('timer', TimeDomain.WATERMARK)
+  >
+  >   @on_timer(TIMER_SPEC)
+  >   def my_timer_expiry_callback(self):
+  >     logging.info('Timer expired!')
+  """
+
+  if not isinstance(timer_spec, TimerSpec):
+    raise ValueError('@on_timer decorator expected TimerSpec.')
+
+  def _inner(method):
+    if not callable(method):
+      raise ValueError('@on_timer decorator expected callable.')
+    setattr(method, _TIMER_SPEC_ATTRIBUTE_NAME, timer_spec)
+    return method
+
+  return _inner
+
+
+class UserStateUtils(object):
+
+  @staticmethod
+  def get_on_timer_methods(dofn):
+    # Avoid circular import.
+    from apache_beam.transforms.core import DoFn
+    if not isinstance(dofn, DoFn):
+      raise ValueError('Expected DoFn.')
+
+    timerid_to_methods = {}
+    result = {}
+
+    for name in dir(dofn):
+      value = getattr(dofn, name, None)
+      if (callable(value) and
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 118458)
    Time Spent: 4h  (was: 3h 50m)

> Implement Beam Python User State and Timer API
> ----------------------------------------------
>
>                 Key: BEAM-4594
>                 URL: https://issues.apache.org/jira/browse/BEAM-4594
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> This issue tracks the implementation of the Beam Python User State and Timer 
> API, described here: [https://s.apache.org/beam-python-user-state-and-timers].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to