Repository: beam
Updated Branches:
  refs/heads/master e906fe9c3 -> c33e9b446


Fix WindowValueCoder for large timestamps


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee728f1b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee728f1b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee728f1b

Branch: refs/heads/master
Commit: ee728f1b2f617dac8e5cd729cacf1a46911021e0
Parents: e906fe9
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Mon Jun 12 23:11:22 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 13 10:11:20 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py         | 4 ++++
 sdks/python/apache_beam/coders/coders_test_common.py | 8 ++++++++
 2 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index 10298bf..2670250 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -710,6 +710,10 @@ class WindowedValueCoderImpl(StreamCoderImpl):
       timestamp = MAX_TIMESTAMP.micros
     else:
       timestamp *= 1000
+      if timestamp > MAX_TIMESTAMP.micros:
+        timestamp = MAX_TIMESTAMP.micros
+      if timestamp < MIN_TIMESTAMP.micros:
+        timestamp = MIN_TIMESTAMP.micros
 
     windows = self._windows_coder.decode_from_stream(in_stream, True)
     # Read PaneInfo encoded byte.

http://git-wip-us.apache.org/repos/asf/beam/blob/ee728f1b/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index c9b67b3..577c53a 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -23,6 +23,8 @@ import unittest
 
 import dill
 
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
 import observable
 from apache_beam.transforms import window
 from apache_beam.utils import timestamp
@@ -287,6 +289,12 @@ class CodersTest(unittest.TestCase):
     # Test binary representation
     self.assertEqual('\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01',
                      coder.encode(window.GlobalWindows.windowed_value(1)))
+
+    # Test decoding large timestamp
+    self.assertEqual(
+        coder.decode('\x7f\xdf;dZ\x1c\xac\x08\x00\x00\x00\x01\x0f\x00'),
+        windowed_value.create(0, MIN_TIMESTAMP.micros, (GlobalWindow(),)))
+
     # Test unnested
     self.check_coder(
         coders.WindowedValueCoder(coders.VarIntCoder()),

Reply via email to