[ 
https://issues.apache.org/jira/browse/BEAM-4858?focusedWorklogId=152625&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152625
 ]

ASF GitHub Bot logged work on BEAM-4858:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/18 10:54
            Start Date: 09/Oct/18 10:54
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #6375: [BEAM-4858] Clean 
up division in batch size estimator.
URL: https://github.com/apache/beam/pull/6375
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 8a999691f03..067d4f74aaa 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -30,7 +30,6 @@
 from builtins import zip
 
 from future.utils import itervalues
-from past.utils import old_div
 
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
@@ -213,6 +212,7 @@ def __init__(self,
                max_batch_size=1000,
                target_batch_overhead=.1,
                target_batch_duration_secs=1,
+               variance=0.25,
                clock=time.time):
     if min_batch_size > max_batch_size:
       raise ValueError("Minimum (%s) must not be greater than maximum (%s)" % (
@@ -230,6 +230,7 @@ def __init__(self,
     self._max_batch_size = max_batch_size
     self._target_batch_overhead = target_batch_overhead
     self._target_batch_duration_secs = target_batch_duration_secs
+    self._variance = variance
     self._clock = clock
     self._data = []
     self._ignore_next_timing = False
@@ -269,23 +270,63 @@ def record_time(self, batch_size):
         self._thin_data()
 
   def _thin_data(self):
-    sorted_data = sorted(self._data)
-    odd_one_out = [sorted_data[-1]] if len(sorted_data) % 2 == 1 else []
-    # Sort the pairs by how different they are.
-
-    def div_keys(kv1_kv2):
-      (x1, _), (x2, _) = kv1_kv2
-      return old_div(x2, x1) # TODO(BEAM-4858)
-
-    pairs = sorted(zip(sorted_data[::2], sorted_data[1::2]),
-                   key=div_keys)
-    # Keep the top 1/3 most different pairs, average the top 2/3 most similar.
-    threshold = 2 * len(pairs) // 3
-    self._data = (
-        list(sum(pairs[threshold:], ()))
-        + [((x1 + x2) / 2.0, (t1 + t2) / 2.0)
-           for (x1, t1), (x2, t2) in pairs[:threshold]]
-        + odd_one_out)
+    # Make sure we don't change the parity of len(self._data)
+    # As it's used below to alternate jitter.
+    self._data.pop(random.randrange(len(self._data) // 4))
+    self._data.pop(random.randrange(len(self._data) // 2))
+
+  @staticmethod
+  def linear_regression_no_numpy(xs, ys):
+    # Least squares fit for y = a + bx over all points.
+    n = float(len(xs))
+    xbar = sum(xs) / n
+    ybar = sum(ys) / n
+    b = (sum([(x - xbar) * (y - ybar) for x, y in zip(xs, ys)])
+         / sum([(x - xbar)**2 for x in xs]))
+    a = ybar - b * xbar
+    return a, b
+
+  @staticmethod
+  def linear_regression_numpy(xs, ys):
+    # pylint: disable=wrong-import-order, wrong-import-position
+    import numpy as np
+    from numpy import sum
+    xs = np.asarray(xs, dtype=float)
+    ys = np.asarray(ys, dtype=float)
+
+    # First do a simple least squares fit for y = a + bx over all points.
+    b, a = np.polyfit(xs, ys, 1)
+
+    n = len(xs)
+    if n < 10:
+      return a, b
+    else:
+      # Refine this by throwing out outliers, according to Cook's distance.
+      # https://en.wikipedia.org/wiki/Cook%27s_distance
+      sum_x = sum(xs)
+      sum_x2 = sum(xs**2)
+      errs = a + b * xs - ys
+      s2 = sum(errs**2) / (n - 2)
+      if s2 == 0:
+        # It's an exact fit!
+        return a, b
+      h = (sum_x2 - 2 * sum_x * xs + n * xs**2) / (n * sum_x2 - sum_x**2)
+      cook_ds = 0.5 / s2 * errs**2 * (h / (1 - h)**2)
+
+      # Re-compute the regression, excluding those points with Cook's distance
+      # greater than 0.5, and weighting by the inverse of x to give a more
+      # stable y-intercept (as small batches have relatively information
+      # about the fixed ovehead).
+      weight = (cook_ds <= 0.5) / xs
+      b, a = np.polyfit(xs, ys, 1, w=weight)
+      return a, b
+
+  try:
+    # pylint: disable=wrong-import-order, wrong-import-position
+    import numpy as np
+    linear_regression = linear_regression_numpy
+  except ImportError:
+    linear_regression = linear_regression_no_numpy
 
   def next_batch_size(self):
     if self._min_batch_size == self._max_batch_size:
@@ -300,14 +341,14 @@ def next_batch_size(self):
               self._min_batch_size * self._MAX_GROWTH_FACTOR),
           self._min_batch_size + 1))
 
+    # There tends to be a lot of noise in the top quantile, which also
+    # has outsided influence in the regression.  If we have enough data,
+    # Simply declare the top 20% to be outliers.
+    trimmed_data = sorted(self._data)[:max(20, len(self._data) * 4 // 5)]
+
     # Linear regression for y = a + bx, where x is batch size and y is time.
-    xs, ys = zip(*self._data)
-    n = float(len(self._data))
-    xbar = sum(xs) / n
-    ybar = sum(ys) / n
-    b = (sum([(x - xbar) * (y - ybar) for x, y in self._data])
-         / sum([(x - xbar)**2 for x in xs]))
-    a = ybar - b * xbar
+    xs, ys = zip(*trimmed_data)
+    a, b = self.linear_regression(xs, ys)
 
     # Avoid nonsensical or division-by-zero errors below due to noise.
     a = max(a, 1e-10)
@@ -316,17 +357,26 @@ def next_batch_size(self):
     last_batch_size = self._data[-1][0]
     cap = min(last_batch_size * self._MAX_GROWTH_FACTOR, self._max_batch_size)
 
+    target = self._max_batch_size
+
     if self._target_batch_duration_secs:
       # Solution to a + b*x = self._target_batch_duration_secs.
-      cap = min(cap, (self._target_batch_duration_secs - a) / b)
+      target = min(target, (self._target_batch_duration_secs - a) / b)
 
     if self._target_batch_overhead:
       # Solution to a / (a + b*x) = self._target_batch_overhead.
-      cap = min(cap, (a / b) * (1 / self._target_batch_overhead - 1))
+      target = min(target, (a / b) * (1 / self._target_batch_overhead - 1))
 
-    # Avoid getting stuck at min_batch_size.
+    # Avoid getting stuck at a single batch size (especially the minimal
+    # batch size) which would not allow us to extrapolate to other batch
+    # sizes.
+    # Jitter alternates between 0 and 1.
     jitter = len(self._data) % 2
-    return int(max(self._min_batch_size + jitter, cap))
+    # Smear our samples across a range centered at the target.
+    if len(self._data) > 10:
+      target += int(target * self._variance * 2 * (random.random() - .5))
+
+    return int(max(self._min_batch_size + jitter, min(target, cap)))
 
 
 class _GlobalWindowsBatchingDoFn(DoFn):
@@ -425,6 +475,9 @@ class BatchElements(PTransform):
         as used in the formula above
     target_batch_duration_secs: (optional) a target for total time per bundle,
         in seconds
+    variance: (optional) the permitted (relative) amount of deviation from the
+        (estimated) ideal batch size used to produce a wider base for
+        linear interpolation
     clock: (optional) an alternative to time.time for measuring the cost of
         donwstream operations (mostly for testing)
   """
@@ -434,12 +487,14 @@ def __init__(self,
                max_batch_size=10000,
                target_batch_overhead=.05,
                target_batch_duration_secs=1,
+               variance=0.25,
                clock=time.time):
     self._batch_size_estimator = _BatchSizeEstimator(
         min_batch_size=min_batch_size,
         max_batch_size=max_batch_size,
         target_batch_overhead=target_batch_overhead,
         target_batch_duration_secs=target_batch_duration_secs,
+        variance=variance,
         clock=clock)
 
   def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index 6cec4a5bf36..e592f938e17 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import logging
+import random
 import time
 import unittest
 from builtins import object
@@ -125,6 +126,75 @@ def test_target_overhead(self):
         clock.sleep(batch_duration(actual_sizes[-1]))
     self.assertEqual(expected_sizes, actual_sizes)
 
+  def test_variance(self):
+    clock = FakeClock()
+    variance = 0.25
+    batch_estimator = util._BatchSizeEstimator(
+        target_batch_overhead=.05, target_batch_duration_secs=None,
+        variance=variance, clock=clock)
+    batch_duration = lambda batch_size: 1 + .7 * batch_size
+    expected_target = 27
+    actual_sizes = []
+    for _ in range(util._BatchSizeEstimator._MAX_DATA_POINTS - 1):
+      actual_sizes.append(batch_estimator.next_batch_size())
+      with batch_estimator.record_time(actual_sizes[-1]):
+        clock.sleep(batch_duration(actual_sizes[-1]))
+    # Check that we're testing a good range of values.
+    stable_set = set(actual_sizes[-20:])
+    self.assertGreater(len(stable_set), 3)
+    self.assertGreater(
+        min(stable_set), expected_target - expected_target * variance)
+    self.assertLess(
+        max(stable_set), expected_target + expected_target * variance)
+
+  def _run_regression_test(self, linear_regression_fn, test_outliers):
+    xs = [random.random() for _ in range(10)]
+    ys = [2*x + 1 for x in xs]
+    a, b = linear_regression_fn(xs, ys)
+    self.assertAlmostEqual(a, 1)
+    self.assertAlmostEqual(b, 2)
+
+    xs = [1 + random.random() for _ in range(100)]
+    ys = [7*x + 5 + 0.01 * random.random() for x in xs]
+    a, b = linear_regression_fn(xs, ys)
+    self.assertAlmostEqual(a, 5, delta=0.01)
+    self.assertAlmostEqual(b, 7, delta=0.01)
+
+    if test_outliers:
+      xs = [1 + random.random() for _ in range(100)]
+      ys = [2*x + 1 for x in xs]
+      a, b = linear_regression_fn(xs, ys)
+      self.assertAlmostEqual(a, 1)
+      self.assertAlmostEqual(b, 2)
+
+      # An outlier or two doesn't affect the result.
+      for _ in range(2):
+        xs += [10]
+        ys += [30]
+        a, b = linear_regression_fn(xs, ys)
+        self.assertAlmostEqual(a, 1)
+        self.assertAlmostEqual(b, 2)
+
+      # But enough of them, and they're no longer outliers.
+      xs += [10] * 10
+      ys += [30] * 10
+      a, b = linear_regression_fn(xs, ys)
+      self.assertLess(a, 0.5)
+      self.assertGreater(b, 2.5)
+
+  def test_no_numpy_regression(self):
+    self._run_regression_test(
+        util._BatchSizeEstimator.linear_regression_no_numpy, False)
+
+  def test_numpy_regression(self):
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      import numpy as _
+    except ImportError:
+      self.skipTest('numpy not available')
+    self._run_regression_test(
+        util._BatchSizeEstimator.linear_regression_numpy, True)
+
 
 class IdentityWindowTest(unittest.TestCase):
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152625)
    Time Spent: 5.5h  (was: 5h 20m)

> Clean up _BatchSizeEstimator in element-batching transform.
> -----------------------------------------------------------
>
>                 Key: BEAM-4858
>                 URL: https://issues.apache.org/jira/browse/BEAM-4858
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Robert Bradshaw
>            Priority: Minor
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Beam Python 3 conversion [exposed|https://github.com/apache/beam/pull/5729] 
> non-trivial performance-sensitive logic in element-batching transform. Let's 
> take a look at 
> [util.py#L271|https://github.com/apache/beam/blob/e98ff7c96afa2f72b3a98426dc1e9a47224da5c8/sdks/python/apache_beam/transforms/util.py#L271].
>  
> Due to Python 2 language semantics, the result of {{x2 / x1}} will depend on 
> the type of the keys - whether they are integers or floats. 
> The keys of key-value pairs contained in {{self._data}} are added as integers 
> [here|https://github.com/apache/beam/blob/d2ac08da2dccce8930432fae1ec7c30953880b69/sdks/python/apache_beam/transforms/util.py#L260],
>  however, when we 'thin' the collected entries 
> [here|https://github.com/apache/beam/blob/d2ac08da2dccce8930432fae1ec7c30953880b69/sdks/python/apache_beam/transforms/util.py#L279],
>  the keys will become floats. Surprisingly, using either integer or float 
> division consistently [in the 
> comparator|https://github.com/apache/beam/blob/e98ff7c96afa2f72b3a98426dc1e9a47224da5c8/sdks/python/apache_beam/transforms/util.py#L271]
>   negatively affects the performance of a custom pipeline I was using to 
> benchmark these changes. The performance impact likely comes from changes in 
> the logic that depends on  how division is evaluated, not from the 
> performance of division operation itself.
> In terms of Python 3 conversion the best course of action that avoids 
> regression seems to be to preserve the existing Python 2 behavior using 
> {{old_div}} from {{past.utils.division}}, in the medium term we should clean 
> up the logic. We may want to add a targeted microbenchmark to evaluate 
> performance of this code, and maybe cythonize the code, since it seems to be 
> performance-sensitive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to