Repository: beam Updated Branches: refs/heads/master e906fe9c3 -> c33e9b446
Fix WindowValueCoder for large timestamps Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee728f1b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee728f1b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee728f1b Branch: refs/heads/master Commit: ee728f1b2f617dac8e5cd729cacf1a46911021e0 Parents: e906fe9 Author: Vikas Kedigehalli <vika...@google.com> Authored: Mon Jun 12 23:11:22 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Tue Jun 13 10:11:20 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/coders/coder_impl.py | 4 ++++ sdks/python/apache_beam/coders/coders_test_common.py | 8 ++++++++ 2 files changed, 12 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coder_impl.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 10298bf..2670250 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -710,6 +710,10 @@ class WindowedValueCoderImpl(StreamCoderImpl): timestamp = MAX_TIMESTAMP.micros else: timestamp *= 1000 + if timestamp > MAX_TIMESTAMP.micros: + timestamp = MAX_TIMESTAMP.micros + if timestamp < MIN_TIMESTAMP.micros: + timestamp = MIN_TIMESTAMP.micros windows = self._windows_coder.decode_from_stream(in_stream, True) # Read PaneInfo encoded byte. http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coders_test_common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index c9b67b3..577c53a 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -23,6 +23,8 @@ import unittest import dill +from apache_beam.transforms.window import GlobalWindow +from apache_beam.utils.timestamp import MIN_TIMESTAMP import observable from apache_beam.transforms import window from apache_beam.utils import timestamp @@ -287,6 +289,12 @@ class CodersTest(unittest.TestCase): # Test binary representation self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', coder.encode(window.GlobalWindows.windowed_value(1))) + + # Test decoding large timestamp + self.assertEqual( + coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'), + windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),))) + # Test unnested self.check_coder( coders.WindowedValueCoder(coders.VarIntCoder()),