robertwb commented on code in PR #32600:
URL: https://github.com/apache/beam/pull/32600#discussion_r1795993058


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2493,6 +2494,68 @@ def as_result(self, error_post_processing=None):
       return self._pvalue
 
 
+class _DeferredStateUpdatingPool:
+  """
+  A class that submits a DoFn#process but defers updating counter metrics until
+  after the subprocess finishes execution.
+  """
+  def __init__(self, pool, timeout):
+    """
+    Args:
+      process_pool (ProcessPoolExecutor).
+      timeout (Optional[float]): The maximum time allowed for execution.
+    """
+    self._pool = pool
+    self._timeout = timeout
+
+  @staticmethod
+  def _wrapped_fn(fn, *args, **kwargs):
+    """Records thread scoped state modifications in the subprocess/thread and
+    replays them once the thread/subprocess returns"""
+    from apache_beam.runners.worker.statesampler_stub import StubStateSampler
+    stub_state_sampler = StubStateSampler()
+
+    from apache_beam.runners.worker.statesampler import set_current_tracker
+    set_current_tracker(stub_state_sampler)
+
+    results = fn(*args, **kwargs)
+    if results is not None:
+      # Ensure we iterate over the entire output list in the given amount of
+      # time.
+      results = list(results)
+    return (results, stub_state_sampler)
+
+  def submit(self, process_fn, *args, **kwargs):
+    """
+    Submits the process_fn for execution.
+
+    Args:
+        process_fn (Callable): DoFn#process function to be executed in a
+          subprocess or thread.
+        *args: Positional arguments to be passed to the wrapped method.
+        **kwargs: Keyword arguments to be passed to the wrapped method.
+
+    Returns:
+        Optional[list]: The results of the submitted_fn execution, or None if
+          no results.
+    """
+    results, stub_state_sampler = self._pool.submit(
+      functools.partial(self._wrapped_fn, process_fn),
+      *args, **kwargs).result(self._timeout)
+
+    from apache_beam.runners.worker.statesampler import get_current_tracker
+    tracker = get_current_tracker()
+
+    if tracker is not None:
+      for typed_metric_name, value in (
+        stub_state_sampler.get_recorded_calls().items()
+      ):
+        tracker.update_metric(typed_metric_name, value)
+    if results is None:

Review Comment:
   This logic is redundant with the logic in `_wrapped_fn`, right?



##########
sdks/python/apache_beam/runners/worker/statesampler_interface.py:
##########
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+
+
+class StateSamplerInterface(ABC):
+  @abstractmethod
+  def update_metric(self, typed_metric_name, value):

Review Comment:
   That's an interesting idea, however I'd rather avoid dependencies on test 
classes like MagicMock. (It may also have issues with the pickling required 
here). I do think if we're going to introduce an interface here, we should list 
all the relevant methods and have the stub explicitly raise an error for the 
unsupported ones to clarify that it's a partial implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to