lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r448096760



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -91,7 +91,8 @@ def __init__(
       bundle_repeat=0,
       use_state_iterables=False,
       provision_info=None,  # type: Optional[ExtendedProvisionInfo]
-      progress_request_frequency=None):
+      progress_request_frequency=None,
+      is_drain = False):

Review comment:
       ```suggestion
         is_drain=False):
   ```

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner_test.py
##########
@@ -242,6 +242,15 @@ def process(self, kv, 
index=beam.DoFn.StateParam(index_state_spec)):
 
   # Inherits all other tests from fn_api_runner_test.FnApiRunnerTest
 
+  def test_sdf_default_truncate_when_bounded(self):
+    raise unittest.SkipTest("Portable runners don't support drain now")
+
+  def test_sdf_default_truncate_when_unbounded(self):
+    raise unittest.SkipTest("Portable runners don't support drain now")
+
+  def test_sdf_with_truncate(self):
+    raise unittest.SkipTest("Portable runners don't support drain now")

Review comment:
       ```suggestion
       raise unittest.SkipTest("Portable runners don't yet support drain")
   
     def test_sdf_default_truncate_when_unbounded(self):
       raise unittest.SkipTest("Portable runners don't yet support drain")
   
     def test_sdf_with_truncate(self):
       raise unittest.SkipTest("Portable runners don't yet support drain")
   ```

##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -1613,6 +1672,40 @@ def restriction_size(self, element, restriction):
     return restriction.size()
 
 
+class SimpleUnboundedOffsetRangeRestrictionTracker(

Review comment:
       ```suggestion
   class UnboundedOffsetRestrictionTracker(
   ```

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1243,6 +1243,19 @@ def try_claim(self, position):
     """
     raise NotImplementedError
 
+  def is_bounded(self):
+    """Identify whether the output produced by the current restriction is
+    bounded.
+
+    The value is important for the default behavior of truncate when the
+    pipeline starts to drain. If the current restriction is
+    bounded, it will be processed completely by default. If the restriction is
+    unbounded, it will be truncated into null and finish processing 
immediately.
+
+    The API is required to be implemented.

Review comment:
       ```suggestion
       """Returns whether the amount of work represented by the current 
restriction is
       bounded.
   
       The boundedness of the restriction is used to determine the default 
behavior of how to truncate restrictions when a pipeline is being `drained 
<https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#>`_.
 If the restriction is bounded, then the entire restriction will be processed 
otherwise the restriction will be processed till a checkpoint is possible.
   
       The API is required to be implemented.
       
       Returns: ``True`` if the restriction represents a finite amount of work.
       Otherwise, returns ``False``.
   ```
   
   I think what I suggested for the link is correct based upon 
https://devguide.python.org/documenting/#external-links

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,19 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):
+    """Truncate the given restriction into finite amount of work when the
+    pipeline starts to drain.
+
+    By default, if the restriction is bounded, it will return the entire
+    restriction. If the restriction is unbounded, it will not return anything.
+
+    It's recommended to implement this API if more granularity is required.

Review comment:
       ```suggestion
       """Truncates the provided restriction into a restriction representing a 
finite amount of work when the pipeline is `draining 
<https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#
 for additional details about drain.>_`.
   
       By default, if the restriction is bounded then the restriction will be 
returned otherwise None will be returned.
       
       This API is optional and should only be implemented if more granularity 
is required.
       
       Return a truncated finite restriction if further processing is required 
otherwise return None to represent that no further processing of this 
restriction is required.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to