[ 
https://issues.apache.org/jira/browse/BEAM-4594?focusedWorklogId=121019&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121019
 ]

ASF GitHub Bot logged work on BEAM-4594:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jul/18 21:12
            Start Date: 09/Jul/18 21:12
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #5691: [BEAM-4594] 
Beam Python state and timers user-facing API
URL: https://github.com/apache/beam/pull/5691
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 88745c778e3..aa435594ed7 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -41,6 +41,7 @@
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import core
 from apache_beam.transforms.core import RestrictionProvider
+from apache_beam.transforms.userstate import UserStateUtils
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
@@ -208,18 +209,29 @@ def _validate(self):
     self._validate_process()
     self._validate_bundle_method(self.start_bundle_method)
     self._validate_bundle_method(self.finish_bundle_method)
+    self._validate_stateful_dofn()
 
   def _validate_process(self):
     """Validate that none of the DoFnParameters are repeated in the function
     """
-    for param in core.DoFn.DoFnParams:
-      assert self.process_method.defaults.count(param) <= 1
+    param_ids = [d.param_id for d in self.process_method.defaults
+                 if isinstance(d, core._DoFnParam)]
+    if len(param_ids) != len(set(param_ids)):
+      raise ValueError(
+          'DoFn %r has duplicate process method parameters: %s.' % (
+              self.do_fn, param_ids))
 
   def _validate_bundle_method(self, method_wrapper):
     """Validate that none of the DoFnParameters are used in the function
     """
-    for param in core.DoFn.DoFnParams:
-      assert param not in method_wrapper.defaults
+    for param in core.DoFn.DoFnProcessParams:
+      if param in method_wrapper.defaults:
+        raise ValueError(
+            'DoFn.process() method-only parameter %s cannot be used in %s.' %
+            (param, method_wrapper))
+
+  def _validate_stateful_dofn(self):
+    UserStateUtils.validate_stateful_dofn(self.do_fn)
 
   def is_splittable_dofn(self):
     return any([isinstance(default, RestrictionProvider) for default in
diff --git a/sdks/python/apache_beam/runners/common_test.py 
b/sdks/python/apache_beam/runners/common_test.py
index d4848e48abe..18e2c455f1f 100644
--- a/sdks/python/apache_beam/runners/common_test.py
+++ b/sdks/python/apache_beam/runners/common_test.py
@@ -30,7 +30,7 @@ class MyDoFn(DoFn):
       def process(self, element, w1=DoFn.WindowParam, w2=DoFn.WindowParam):
         pass
 
-    with self.assertRaises(AssertionError):
+    with self.assertRaises(ValueError):
       DoFnSignature(MyDoFn())
 
   def test_dofn_validate_start_bundle_error(self):
@@ -41,7 +41,7 @@ def process(self, element):
       def start_bundle(self, w1=DoFn.WindowParam):
         pass
 
-    with self.assertRaises(AssertionError):
+    with self.assertRaises(ValueError):
       DoFnSignature(MyDoFn())
 
   def test_dofn_validate_finish_bundle_error(self):
@@ -52,7 +52,7 @@ def process(self, element):
       def finish_bundle(self, w1=DoFn.WindowParam):
         pass
 
-    with self.assertRaises(AssertionError):
+    with self.assertRaises(ValueError):
       DoFnSignature(MyDoFn())
 
 
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 506eadb92ec..bbd78342a7f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -43,6 +43,8 @@
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import PTransformWithSideInputs
+from apache_beam.transforms.userstate import StateSpec
+from apache_beam.transforms.userstate import TimerSpec
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import TimestampCombiner
 from apache_beam.transforms.window import TimestampedValue
@@ -278,6 +280,41 @@ def get_function_arguments(obj, func):
   return inspect.getargspec(f)
 
 
+class _DoFnParam(object):
+  """DoFn parameter."""
+
+  def __init__(self, param_id):
+    self.param_id = param_id
+
+  def __eq__(self, other):
+    if type(self) == type(other):
+      return self.param_id == other.param_id
+    return False
+
+  def __repr__(self):
+    return self.param_id
+
+
+class _StateDoFnParam(_DoFnParam):
+  """State DoFn parameter."""
+
+  def __init__(self, state_spec):
+    if not isinstance(state_spec, StateSpec):
+      raise ValueError("DoFn.StateParam expected StateSpec object.")
+    self.state_spec = state_spec
+    self.param_id = 'StateParam(%s)' % state_spec.name
+
+
+class _TimerDoFnParam(_DoFnParam):
+  """Timer DoFn parameter."""
+
+  def __init__(self, timer_spec):
+    if not isinstance(timer_spec, TimerSpec):
+      raise ValueError("DoFn.TimerParam expected TimerSpec object.")
+    self.timer_spec = timer_spec
+    self.param_id = 'TimerParam(%s)' % timer_spec.name
+
+
 class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   """A function object used by a transform with custom processing.
 
@@ -290,13 +327,21 @@ class DoFn(WithTypeHints, HasDisplayData, 
urns.RunnerApiFn):
   callable object using the CallableWrapperDoFn class.
   """
 
-  ElementParam = 'ElementParam'
-  SideInputParam = 'SideInputParam'
-  TimestampParam = 'TimestampParam'
-  WindowParam = 'WindowParam'
-  WatermarkReporterParam = 'WatermarkReporterParam'
-
-  DoFnParams = [ElementParam, SideInputParam, TimestampParam, WindowParam]
+  # Parameters that can be used in the .process() method.
+  ElementParam = _DoFnParam('ElementParam')
+  SideInputParam = _DoFnParam('SideInputParam')
+  TimestampParam = _DoFnParam('TimestampParam')
+  WindowParam = _DoFnParam('WindowParam')
+  WatermarkReporterParam = _DoFnParam('WatermarkReporterParam')
+
+  DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam,
+                       WindowParam, WatermarkReporterParam]
+
+  # Parameters to access state and timers.  Not restricted to use only in the
+  # .process() method. Usage: DoFn.StateParam(state_spec),
+  # DoFn.TimerParam(timer_spec).
+  StateParam = _StateDoFnParam
+  TimerParam = _TimerDoFnParam
 
   @staticmethod
   def from_callable(fn):
diff --git a/sdks/python/apache_beam/transforms/userstate.py 
b/sdks/python/apache_beam/transforms/userstate.py
new file mode 100644
index 00000000000..6a5fd581bb7
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""User-facing interfaces for the Beam State and Timer APIs.
+
+Experimental; no backwards-compatibility guarantees.
+"""
+
+from __future__ import absolute_import
+
+import types
+
+from apache_beam.coders import Coder
+from apache_beam.transforms.timeutil import TimeDomain
+
+
+class StateSpec(object):
+  """Specification for a user DoFn state cell."""
+
+  def __init__(self):
+    raise NotImplementedError
+
+  def __repr__(self):
+    return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+class BagStateSpec(StateSpec):
+  """Specification for a user DoFn bag state cell."""
+
+  def __init__(self, name, coder):
+    assert isinstance(name, str)
+    assert isinstance(coder, Coder)
+    self.name = name
+    self.coder = coder
+
+
+class CombiningValueStateSpec(StateSpec):
+  """Specification for a user DoFn combining value state cell."""
+
+  def __init__(self, name, coder, combiner):
+    # Avoid circular import.
+    from apache_beam.transforms.core import CombineFn
+
+    assert isinstance(name, str)
+    assert isinstance(coder, Coder)
+    assert isinstance(combiner, CombineFn)
+    self.name = name
+    self.coder = coder
+    self.combiner = combiner
+
+
+class TimerSpec(object):
+  """Specification for a user stateful DoFn timer."""
+
+  def __init__(self, name, time_domain):
+    self.name = name
+    if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
+      raise ValueError('Unsupported TimeDomain: %r.' % (time_domain,))
+    self.time_domain = time_domain
+    self._attached_callback = None
+
+  def __repr__(self):
+    return '%s(%s)' % (self.__class__.__name__, self.name)
+
+
+def on_timer(timer_spec):
+  """Decorator for timer firing DoFn method.
+
+  This decorator allows a user to specify an on_timer processing method
+  in a stateful DoFn.  Sample usage::
+
+    class MyDoFn(DoFn):
+      TIMER_SPEC = TimerSpec('timer', TimeDomain.WATERMARK)
+
+      @on_timer(TIMER_SPEC)
+      def my_timer_expiry_callback(self):
+        logging.info('Timer expired!')
+  """
+
+  if not isinstance(timer_spec, TimerSpec):
+    raise ValueError('@on_timer decorator expected TimerSpec.')
+
+  def _inner(method):
+    if not callable(method):
+      raise ValueError('@on_timer decorator expected callable.')
+    if timer_spec._attached_callback:
+      raise ValueError(
+          'Multiple on_timer callbacks registered for %r.' % timer_spec)
+    timer_spec._attached_callback = method
+    return method
+
+  return _inner
+
+
+class UserStateUtils(object):
+
+  @staticmethod
+  def validate_stateful_dofn(dofn):
+    # Avoid circular import.
+    from apache_beam.runners.common import MethodWrapper
+    from apache_beam.transforms.core import _DoFnParam
+    from apache_beam.transforms.core import _StateDoFnParam
+    from apache_beam.transforms.core import _TimerDoFnParam
+
+    all_state_specs = set()
+    all_timer_specs = set()
+
+    # Validate params to process(), start_bundle(), finish_bundle() and to
+    # any on_timer callbacks.
+    for method_name in dir(dofn):
+      if not isinstance(getattr(dofn, method_name, None), types.MethodType):
+        continue
+      method = MethodWrapper(dofn, method_name)
+      param_ids = [d.param_id for d in method.defaults
+                   if isinstance(d, _DoFnParam)]
+      if len(param_ids) != len(set(param_ids)):
+        raise ValueError(
+            'DoFn %r has duplicate %s method parameters: %s.' % (
+                dofn, method_name, param_ids))
+      for d in method.defaults:
+        if isinstance(d, _StateDoFnParam):
+          all_state_specs.add(d.state_spec)
+        elif isinstance(d, _TimerDoFnParam):
+          all_timer_specs.add(d.timer_spec)
+
+    # Reject DoFns that have multiple state or timer specs with the same name.
+    if len(all_state_specs) != len(set(s.name for s in all_state_specs)):
+      raise ValueError(
+          'DoFn %r has multiple StateSpecs with the same name: %s.' % (
+              dofn, all_state_specs))
+    if len(all_timer_specs) != len(set(s.name for s in all_timer_specs)):
+      raise ValueError(
+          'DoFn %r has multiple TimerSpecs with the same name: %s.' % (
+              dofn, all_timer_specs))
+
+    # Reject DoFns that use timer specs without corresponding timer callbacks.
+    for timer_spec in all_timer_specs:
+      if not timer_spec._attached_callback:
+        raise ValueError(
+            ('DoFn %r has a TimerSpec without an associated on_timer '
+             'callback: %s.') % (dofn, timer_spec))
+      method_name = timer_spec._attached_callback.__name__
+      if (timer_spec._attached_callback !=
+          getattr(dofn, method_name, None).__func__):
+        raise ValueError(
+            ('The on_timer callback for %s is not the specified .%s method '
+             'for DoFn %r (perhaps it was overwritten?).') % (
+                 timer_spec, method_name, dofn))
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py 
b/sdks/python/apache_beam/transforms/userstate_test.py
new file mode 100644
index 00000000000..8dbc9ce5e77
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -0,0 +1,273 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the Beam State and Timer API interfaces."""
+
+import unittest
+
+import mock
+
+from apache_beam.coders import BytesCoder
+from apache_beam.coders import VarIntCoder
+from apache_beam.runners.common import DoFnSignature
+from apache_beam.transforms.combiners import TopCombineFn
+from apache_beam.transforms.core import DoFn
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.transforms.userstate import BagStateSpec
+from apache_beam.transforms.userstate import CombiningValueStateSpec
+from apache_beam.transforms.userstate import TimerSpec
+from apache_beam.transforms.userstate import UserStateUtils
+from apache_beam.transforms.userstate import on_timer
+
+
+class TestStatefulDoFn(DoFn):
+  """An example stateful DoFn with state and timers."""
+
+  BUFFER_STATE_1 = BagStateSpec('buffer', BytesCoder())
+  BUFFER_STATE_2 = BagStateSpec('buffer2', VarIntCoder())
+  EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+  EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+  EXPIRY_TIMER_3 = TimerSpec('expiry3', TimeDomain.WATERMARK)
+
+  def process(self, element, t=DoFn.TimestampParam,
+              buffer_1=DoFn.StateParam(BUFFER_STATE_1),
+              buffer_2=DoFn.StateParam(BUFFER_STATE_2),
+              timer_1=DoFn.TimerParam(EXPIRY_TIMER_1),
+              timer_2=DoFn.TimerParam(EXPIRY_TIMER_2)):
+    yield element
+
+  @on_timer(EXPIRY_TIMER_1)
+  def on_expiry_1(self,
+                  buffer=DoFn.StateParam(BUFFER_STATE_1),
+                  timer_1=DoFn.TimerParam(EXPIRY_TIMER_1),
+                  timer_2=DoFn.TimerParam(EXPIRY_TIMER_2),
+                  timer_3=DoFn.TimerParam(EXPIRY_TIMER_3)):
+    yield 'expired1'
+
+  @on_timer(EXPIRY_TIMER_2)
+  def on_expiry_2(self,
+                  buffer=DoFn.StateParam(BUFFER_STATE_2),
+                  timer_2=DoFn.TimerParam(EXPIRY_TIMER_2),
+                  timer_3=DoFn.TimerParam(EXPIRY_TIMER_3)):
+    yield 'expired2'
+
+  @on_timer(EXPIRY_TIMER_3)
+  def on_expiry_3(self,
+                  buffer_1=DoFn.StateParam(BUFFER_STATE_1),
+                  buffer_2=DoFn.StateParam(BUFFER_STATE_2),
+                  timer_3=DoFn.TimerParam(EXPIRY_TIMER_3)):
+    yield 'expired3'
+
+
+class InterfaceTest(unittest.TestCase):
+
+  def _validate_dofn(self, dofn):
+    # Construction of DoFnSignature performs validation of the given DoFn.
+    # In particular, it ends up calling userstate._validate_stateful_dofn.
+    # That behavior is explicitly tested below in test_validate_dofn()
+    DoFnSignature(dofn)
+
+  @mock.patch(
+      'apache_beam.transforms.userstate.UserStateUtils.validate_stateful_dofn')
+  def test_validate_dofn(self, unused_mock):
+    dofn = TestStatefulDoFn()
+    self._validate_dofn(dofn)
+    UserStateUtils.validate_stateful_dofn.assert_called_with(dofn)
+
+  def test_spec_construction(self):
+    BagStateSpec('statename', VarIntCoder())
+    with self.assertRaises(AssertionError):
+      BagStateSpec(123, VarIntCoder())
+    CombiningValueStateSpec('statename', VarIntCoder(), TopCombineFn(10))
+    with self.assertRaises(AssertionError):
+      CombiningValueStateSpec(123, VarIntCoder(), TopCombineFn(10))
+    with self.assertRaises(AssertionError):
+      CombiningValueStateSpec('statename', VarIntCoder(), object())
+    # BagStateSpec('bag', )
+    # TODO: add more spec tests
+    with self.assertRaises(ValueError):
+      DoFn.TimerParam(BagStateSpec('elements', BytesCoder()))
+
+    TimerSpec('timer', TimeDomain.WATERMARK)
+    TimerSpec('timer', TimeDomain.REAL_TIME)
+    with self.assertRaises(ValueError):
+      TimerSpec('timer', 'bogus_time_domain')
+    with self.assertRaises(ValueError):
+      DoFn.StateParam(TimerSpec('timer', TimeDomain.WATERMARK))
+
+  def test_param_construction(self):
+    with self.assertRaises(ValueError):
+      DoFn.StateParam(TimerSpec('timer', TimeDomain.WATERMARK))
+    with self.assertRaises(ValueError):
+      DoFn.TimerParam(BagStateSpec('elements', BytesCoder()))
+
+  def test_good_signatures(self):
+    class BasicStatefulDoFn(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+      EXPIRY_TIMER = TimerSpec('expiry1', TimeDomain.WATERMARK)
+
+      def process(self, element, buffer=DoFn.StateParam(BUFFER_STATE),
+                  timer1=DoFn.TimerParam(EXPIRY_TIMER)):
+        yield element
+
+      @on_timer(EXPIRY_TIMER)
+      def expiry_callback(self, element, timer=DoFn.TimerParam(EXPIRY_TIMER)):
+        yield element
+
+    self._validate_dofn(BasicStatefulDoFn())
+    self._validate_dofn(TestStatefulDoFn())
+
+  def test_bad_signatures(self):
+    # (1) The same state parameter is duplicated on the process method.
+    class BadStatefulDoFn1(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+
+      def process(self, element, b1=DoFn.StateParam(BUFFER_STATE),
+                  b2=DoFn.StateParam(BUFFER_STATE)):
+        yield element
+
+    with self.assertRaises(ValueError):
+      self._validate_dofn(BadStatefulDoFn1())
+
+    # (2) The same timer parameter is duplicated on the process method.
+    class BadStatefulDoFn2(DoFn):
+      TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+
+      def process(self, element, t1=DoFn.TimerParam(TIMER),
+                  t2=DoFn.TimerParam(TIMER)):
+        yield element
+
+    with self.assertRaises(ValueError):
+      self._validate_dofn(BadStatefulDoFn2())
+
+    # (3) The same state parameter is duplicated on the on_timer method.
+    class BadStatefulDoFn3(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+      EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+      EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+
+      @on_timer(EXPIRY_TIMER_1)
+      def expiry_callback(self, element, b1=DoFn.StateParam(BUFFER_STATE),
+                          b2=DoFn.StateParam(BUFFER_STATE)):
+        yield element
+
+    with self.assertRaises(ValueError):
+      self._validate_dofn(BadStatefulDoFn3())
+
+    # (4) The same timer parameter is duplicated on the on_timer method.
+    class BadStatefulDoFn4(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+      EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+      EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+
+      @on_timer(EXPIRY_TIMER_1)
+      def expiry_callback(self, element, t1=DoFn.TimerParam(EXPIRY_TIMER_2),
+                          t2=DoFn.TimerParam(EXPIRY_TIMER_2)):
+        yield element
+
+    with self.assertRaises(ValueError):
+      self._validate_dofn(BadStatefulDoFn4())
+
+  def test_validation_typos(self):
+    # (1) Here, the user mistakenly used the same timer spec twice for two
+    # different timer callbacks.
+    with self.assertRaisesRegexp(
+        ValueError,
+        r'Multiple on_timer callbacks registered for TimerSpec\(expiry1\).'):
+      class StatefulDoFnWithTimerWithTypo1(DoFn):  # pylint: 
disable=unused-variable
+        BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+        EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+        EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+
+        def process(self, element):
+          pass
+
+        @on_timer(EXPIRY_TIMER_1)
+        def on_expiry_1(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+          yield 'expired1'
+
+        # Note that we mistakenly associate this with the first timer.
+        @on_timer(EXPIRY_TIMER_1)
+        def on_expiry_2(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+          yield 'expired2'
+
+    # (2) Here, the user mistakenly used the same callback name and overwrote
+    # the first on_expiry_1 callback.
+    class StatefulDoFnWithTimerWithTypo2(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+      EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+      EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+
+      def process(self, element,
+                  timer1=DoFn.TimerParam(EXPIRY_TIMER_1),
+                  timer2=DoFn.TimerParam(EXPIRY_TIMER_2)):
+        pass
+
+      @on_timer(EXPIRY_TIMER_1)
+      def on_expiry_1(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+        yield 'expired1'
+
+      # Note that we mistakenly reuse the "on_expiry_2" name; this is valid
+      # syntactically in Python.
+      @on_timer(EXPIRY_TIMER_2)
+      def on_expiry_1(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+        yield 'expired2'
+
+      # Use a stable string value for matching.
+      def __repr__(self):
+        return 'StatefulDoFnWithTimerWithTypo2'
+
+    dofn = StatefulDoFnWithTimerWithTypo2()
+    with self.assertRaisesRegexp(
+        ValueError,
+        (r'The on_timer callback for TimerSpec\(expiry1\) is not the '
+         r'specified .on_expiry_1 method for DoFn '
+         r'StatefulDoFnWithTimerWithTypo2 \(perhaps it was overwritten\?\).')):
+      UserStateUtils.validate_stateful_dofn(dofn)
+
+    # (2) Here, the user forgot to add an on_timer decorator for 'expiry2'
+    class StatefulDoFnWithTimerWithTypo3(DoFn):
+      BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
+      EXPIRY_TIMER_1 = TimerSpec('expiry1', TimeDomain.WATERMARK)
+      EXPIRY_TIMER_2 = TimerSpec('expiry2', TimeDomain.WATERMARK)
+
+      def process(self, element,
+                  timer1=DoFn.TimerParam(EXPIRY_TIMER_1),
+                  timer2=DoFn.TimerParam(EXPIRY_TIMER_2)):
+        pass
+
+      @on_timer(EXPIRY_TIMER_1)
+      def on_expiry_1(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+        yield 'expired1'
+
+      def on_expiry_2(self, buffer_state=DoFn.StateParam(BUFFER_STATE)):
+        yield 'expired2'
+
+      # Use a stable string value for matching.
+      def __repr__(self):
+        return 'StatefulDoFnWithTimerWithTypo3'
+
+    dofn = StatefulDoFnWithTimerWithTypo3()
+    with self.assertRaisesRegexp(
+        ValueError,
+        (r'DoFn StatefulDoFnWithTimerWithTypo3 has a TimerSpec without an '
+         r'associated on_timer callback: TimerSpec\(expiry2\).')):
+      UserStateUtils.validate_stateful_dofn(dofn)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh 
b/sdks/python/scripts/generate_pydoc.sh
index dfb87a13f11..0e5bfbbbb58 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -165,7 +165,11 @@ ignore_identifiers = [
   'WindowedTypeConstraint',  # apache_beam.typehints.typehints
 
   # stdlib classes without documentation
-  'unittest.case.TestCase'
+  'unittest.case.TestCase',
+
+  # DoFn param inner classes, due to a Sphinx misparsing of inner classes
+  '_StateDoFnParam',
+  '_TimerDoFnParam',
 ]
 
 # When inferring a base class it will use ':py:class'; if inferring a function


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 121019)
    Time Spent: 5h 20m  (was: 5h 10m)

> Implement Beam Python User State and Timer API
> ----------------------------------------------
>
>                 Key: BEAM-4594
>                 URL: https://issues.apache.org/jira/browse/BEAM-4594
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Major
>              Labels: portability
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> This issue tracks the implementation of the Beam Python User State and Timer 
> API, described here: [https://s.apache.org/beam-python-user-state-and-timers].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to