lukecwik commented on a change in pull request #12275:
URL: https://github.com/apache/beam/pull/12275#discussion_r456681011
##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -845,52 +880,241 @@ def _invoke_process_per_window(self,
deferred_timestamp=deferred_timestamp)
return None
+ @staticmethod
+ def _try_split(fraction,
+ window_index, # type: Optional[int]
+ stop_window_index, # type: Optional[int]
+ windowed_value, # type: WindowedValue
+ restriction,
+ watermark_estimator_state,
+ restriction_provider, # type: RestrictionProvider
+ restriction_tracker, # type: RestrictionTracker
+ watermark_estimator, # type: WatermarkEstimator
+ ):
+ # type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary],
Iterable[SplitResultResidual], Optional[int]]]
+
+ """Try to split returning a primaries, residuals and a new stop index.
+
+ For non-window observing splittable DoFns we split the current restriction
+ and assign the primary and residual to all the windows.
+
+ For window observing splittable DoFns, we:
+ 1) return a split at a window boundary if the fraction lies outside of the
+ current window.
+ 2) attempt to split the current restriction, if successful then return
+ the primary and residual for the current window and an additional
+ primary and residual for any fully processed and fully unprocessed
+ windows.
+ 3) fall back to returning a split at the window boundary if possible
+
+ Args:
+ window_index: the current index of the window being processed or None
+ if the splittable DoFn is not window observing.
+ stop_window_index: the current index to stop processing at or None
+ if the splittable DoFn is not window observing.
+ windowed_value: the current windowed value
+ restriction: the initial restriction when processing was started.
+ watermark_estimator_state: the initial watermark estimator state when
+ processing was started.
+ restriction_provider: the DoFn's restriction provider
+ restriction_tracker: the current restriction tracker
+ watermark_estimator: the current watermark estimator
+
+ Returns:
+ A tuple containing (primaries, residuals, new_stop_index) or None if
+ splitting was not possible. new_stop_index will only be set if the
+ splittable DoFn is window observing otherwise it will be None.
+ """
+ def compute_whole_window_split(to_index, from_index):
+ restriction_size = restriction_provider.restriction_size(
+ windowed_value, restriction)
+ # The primary and residual both share the same value only differing
+ # by the set of windows they are in.
+ value = ((windowed_value.value, (restriction,
watermark_estimator_state)),
+ restriction_size)
+ primary_restriction = SplitResultPrimary(
+ primary_value=WindowedValue(
+ value,
+ windowed_value.timestamp,
+ windowed_value.windows[:to_index])) if to_index > 0 else None
+ # Don't report any updated watermarks for the residual since they have
+ # not processed any part of the restriction.
+ residual_restriction = SplitResultResidual(
+ residual_value=WindowedValue(
+ value,
+ windowed_value.timestamp,
+ windowed_value.windows[from_index:stop_window_index]),
+ current_watermark=None,
+ deferred_timestamp=None) if from_index < stop_window_index else None
+ return (primary_restriction, residual_restriction)
+
+ primary_restrictions = []
+ residual_restrictions = []
+
+ window_observing = window_index is not None
+ # If we are processing each window separately and we aren't on the last
+ # window then compute whether the split lies within the current window
+ # or a future window.
+ if window_observing and window_index != stop_window_index - 1:
+ progress = restriction_tracker.current_progress()
+ if not progress:
+ # Assume no work has been completed for the current window if progress
+ # is unavailable.
+ from apache_beam.io.iobase import RestrictionProgress
+ progress = RestrictionProgress(completed=0, remaining=1)
+
+ scaled_progress = PerWindowInvoker._scale_progress(
+ progress, window_index, stop_window_index)
+ # Compute the fraction of the remainder relative to the scaled progress.
+ # If the value is greater than or equal to progress.remaining_work then
we
+ # should split at the closest window boundary.
+ fraction_of_remainder = scaled_progress.remaining_work * fraction
+ if fraction_of_remainder >= progress.remaining_work:
+ # The fraction is outside of the current window and hence we will
+ # split at the closest window boundary. Favor a split and return the
+ # last window if we would have rounded up to the end of the window
+ # based upon the fraction.
+ new_stop_window_index = min(
+ stop_window_index - 1,
+ window_index + max(
+ 1,
+ int(
+ round((
+ progress.completed_work +
+ scaled_progress.remaining_work * fraction) /
+ progress.total_work))))
+ primary, residual = compute_whole_window_split(
+ new_stop_window_index, new_stop_window_index)
+ assert primary is not None
+ assert residual is not None
+ return ([primary], [residual], new_stop_window_index)
+ else:
+ # The fraction is within the current window being processed so compute
+ # the updated fraction based upon the number of windows being
processed.
+ new_stop_window_index = window_index + 1
+ fraction = fraction_of_remainder / progress.remaining_work
+ # Attempt to split below, if we can't then we'll compute a split
+ # using only window boundaries
+ else:
+ # We aren't splitting within multiple windows so we don't change our
+ # stop index.
+ new_stop_window_index = stop_window_index
+
+ # Temporary workaround for [BEAM-7473]: get current_watermark before
+ # split, in case watermark gets advanced before getting split results.
+ # In worst case, current_watermark is always stale, which is ok.
+ current_watermark = (watermark_estimator.current_watermark())
+ current_estimator_state = (watermark_estimator.get_estimator_state())
+ split = restriction_tracker.try_split(fraction)
+ if split:
+ primary, residual = split
+ element = windowed_value.value
+ primary_size = restriction_provider.restriction_size(
+ windowed_value.value, primary)
+ residual_size = restriction_provider.restriction_size(
+ windowed_value.value, residual)
+ # We use the watermark estimator state for the original process call
+ # for the primary and the updated watermark estimator state for the
+ # residual for the split.
+ primary_split_value = ((element, (primary, watermark_estimator_state)),
+ primary_size)
+ residual_split_value = ((element, (residual, current_estimator_state)),
+ residual_size)
+ windows = (
+ windowed_value.windows[window_index],
+ ) if window_observing else windowed_value.windows
+ primary_restrictions.append(
+ SplitResultPrimary(
+ primary_value=WindowedValue(
+ primary_split_value, windowed_value.timestamp, windows)))
+ residual_restrictions.append(
+ SplitResultResidual(
+ residual_value=WindowedValue(
+ residual_split_value, windowed_value.timestamp, windows),
+ current_watermark=current_watermark,
+ deferred_timestamp=None))
+
+ if window_observing:
+ assert new_stop_window_index == window_index + 1
+ primary, residual = compute_whole_window_split(
+ window_index, window_index + 1)
+ if primary:
+ primary_restrictions.append(primary)
+ if residual:
+ residual_restrictions.append(residual)
+ return (
+ primary_restrictions, residual_restrictions, new_stop_window_index)
+ elif new_stop_window_index and new_stop_window_index != stop_window_index:
+ # If we failed to split but have a new stop index then return a split
+ # at the window boundary.
+ primary, residual = compute_whole_window_split(
+ new_stop_window_index, new_stop_window_index)
+ assert primary is not None
+ assert residual is not None
+ return ([primary], [residual], new_stop_window_index)
+ else:
+ return None
+
def try_split(self, fraction):
- # type: (...) -> Optional[Tuple[SplitResultPrimary, SplitResultResidual]]
+ # type: (...) -> Optional[Tuple[Iterable[SplitResultPrimary],
Iterable[SplitResultResidual]]]
if not self.is_splittable:
return None
with self.splitting_lock:
+ if not self.threadsafe_restriction_tracker:
+ return None
+
# Make a local reference to member variables that change references
during
# processing under lock before attempting to split so we have a
consistent
# view of all the references.
- current_windowed_value = self.current_windowed_value
- threadsafe_restriction_tracker = self.threadsafe_restriction_tracker
- threadsafe_watermark_estimator = self.threadsafe_watermark_estimator
-
- if threadsafe_restriction_tracker:
- # Temporary workaround for [BEAM-7473]: get current_watermark before
- # split, in case watermark gets advanced before getting split results.
- # In worst case, current_watermark is always stale, which is ok.
- current_watermark = (threadsafe_watermark_estimator.current_watermark())
- estimator_state = (threadsafe_watermark_estimator.get_estimator_state())
- split = threadsafe_restriction_tracker.try_split(fraction)
- if split:
- primary, residual = split
- element = current_windowed_value.value
- restriction_provider = self.signature.get_restriction_provider()
- primary_size = restriction_provider.restriction_size(element, primary)
- residual_size = restriction_provider.restriction_size(element,
residual)
- primary_value = ((element, (primary, None)), primary_size)
- residual_value = ((element, (residual, estimator_state)),
residual_size)
- return (
- SplitResultPrimary(
-
primary_value=current_windowed_value.with_value(primary_value)),
- SplitResultResidual(
- residual_value=current_windowed_value.with_value(
- residual_value),
- current_watermark=current_watermark,
- deferred_timestamp=None))
- return None
+ result = PerWindowInvoker._try_split(
+ fraction,
+ self.current_window_index,
+ self.stop_window_index,
+ self.current_windowed_value,
+ self.restriction,
+ self.watermark_estimator_state,
+ self.signature.get_restriction_provider(),
+ self.threadsafe_restriction_tracker,
+ self.threadsafe_watermark_estimator)
+ if not result:
+ return None
+
+ residuals, primaries, self.stop_window_index = result
+ return (residuals, primaries)
+
+ @staticmethod
+ def _scale_progress(progress, window_index, stop_window_index):
+ # We scale progress based upon the amount of work we will do for one
+ # window and have it apply for all windows.
+ completed = window_index * progress.total_work + progress.completed_work
+ remaining = (
+ stop_window_index -
+ (window_index + 1)) * progress.total_work + progress.remaining_work
+ from apache_beam.io.iobase import RestrictionProgress
+ return RestrictionProgress(completed=completed, remaining=remaining)
def current_element_progress(self):
# type: () -> Optional[RestrictionProgress]
- restriction_tracker = self.threadsafe_restriction_tracker
- if restriction_tracker:
- return restriction_tracker.current_progress()
- else:
+ if not self.is_splittable:
return None
+ with self.splitting_lock:
+ current_window_index = self.current_window_index
+ stop_window_index = self.stop_window_index
+ threadsafe_restriction_tracker = self.threadsafe_restriction_tracker
+
+ if not threadsafe_restriction_tracker:
+ return None
+
+ progress = threadsafe_restriction_tracker.current_progress()
+ if not current_window_index or not progress:
+ return progress
+
+ assert stop_window_index
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]