Repository: beam Updated Branches: refs/heads/master 4e0c8333c -> 4e0d5f596
[BEAM-662] Fix for allowing floating point periods in windows Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc1bdd3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc1bdd3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc1bdd3 Branch: refs/heads/master Commit: 1bc1bdd33494b4123855e2e3c9fa823654b31998 Parents: 4e0c833 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Wed Apr 19 18:20:11 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Apr 20 08:53:55 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/window.py | 10 ++++++---- sdks/python/apache_beam/transforms/window_test.py | 14 ++++++++++++++ sdks/python/apache_beam/utils/timestamp.py | 4 ---- 3 files changed, 20 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 319a7b4..931a17d 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -388,13 +388,15 @@ class SlidingWindows(NonMergingWindowFn): raise ValueError('The size parameter must be strictly positive.') self.size = Duration.of(size) self.period = Duration.of(period) - self.offset = Timestamp.of(offset) % size + self.offset = Timestamp.of(offset) % period def assign(self, context): timestamp = context.timestamp - start = timestamp - (timestamp - self.offset) % self.period - return [IntervalWindow(Timestamp.of(s), Timestamp.of(s) + self.size) - for s in range(start, start - self.size, -self.period)] + start = timestamp - ((timestamp - self.offset) % self.period) + return [ + IntervalWindow(Timestamp(micros=s), Timestamp(micros=s) + self.size) + for s in range(start.micros, timestamp.micros - self.size.micros, + -self.period.micros)] def __eq__(self, other): if type(self) == type(other) == SlidingWindows: http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 1ac95e4..cbfd0b2 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -108,6 +108,20 @@ class WindowTest(unittest.TestCase): self.assertEqual(expected, windowfn.assign(context('v', 8))) self.assertEqual(expected, windowfn.assign(context('v', 11))) + def test_sliding_windows_assignment_fraction(self): + windowfn = SlidingWindows(size=3.5, period=2.5, offset=1.5) + self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)], + windowfn.assign(context('v', 1.7))) + self.assertEqual([IntervalWindow(1.5, 5.0)], + windowfn.assign(context('v', 3))) + + def test_sliding_windows_assignment_fraction_large_offset(self): + windowfn = SlidingWindows(size=3.5, period=2.5, offset=4.0) + self.assertEqual([IntervalWindow(1.5, 5.0), IntervalWindow(-1.0, 2.5)], + windowfn.assign(context('v', 1.7))) + self.assertEqual([IntervalWindow(4.0, 7.5), IntervalWindow(1.5, 5.0)], + windowfn.assign(context('v', 4.5))) + def test_sessions_merging(self): windowfn = Sessions(10) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc1bdd3/sdks/python/apache_beam/utils/timestamp.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 647f4bd..8b2ccda 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -167,10 +167,6 @@ class Duration(object): # Note that the returned value may have lost precision. return float(self.micros) / 1000000 - def __int__(self): - # Note that the returned value may have lost precision. - return self.micros / 1000000 - def __cmp__(self, other): # Allow comparisons between Duration and Timestamp values. if not isinstance(other, Timestamp):