This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cb5913a  [BEAM-13081] Fixes a compatible issue of decoding null-value 
bitmap between JVM coder and Python Coder (#15829)
cb5913a is described below

commit cb5913aa51243ed3fedc6030ad33bf2d52f52bc8
Author: Jiayang Wu <530081...@qq.com>
AuthorDate: Thu Nov 4 13:51:43 2021 -0700

    [BEAM-13081] Fixes a compatible issue of decoding null-value bitmap between 
JVM coder and Python Coder (#15829)
    
    * tests passed
    
    * make one line shorter
    
    * reformat
    
    * reformat again
    
    * reformat for the last time
    
    Co-authored-by: Jiayang Wu <jiaya...@twitter.com>
---
 sdks/python/apache_beam/coders/row_coder.py      |  4 +++-
 sdks/python/apache_beam/coders/row_coder_test.py | 26 ++++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/coders/row_coder.py 
b/sdks/python/apache_beam/coders/row_coder.py
index ec3778d..20fa867 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -198,7 +198,9 @@ class RowCoderImpl(StreamCoderImpl):
     words.frombytes(self.NULL_MARKER_CODER.decode_from_stream(in_stream, True))
 
     if words:
-      nulls = ((words[i // 8] >> (i % 8)) & 0x01 for i in range(nvals))
+      nulls = (
+          0 if i // 8 >= len(words) else ((words[i // 8] >> (i % 8)) & 0x01)
+          for i in range(nvals))
     else:
       nulls = itertools.repeat(False, nvals)
 
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py 
b/sdks/python/apache_beam/coders/row_coder_test.py
index 331f824..2fdf7f8 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -47,6 +47,15 @@ Person = typing.NamedTuple(
         ("favorite_time", Timestamp),
     ])
 
+NullablePerson = typing.NamedTuple(
+    "NullablePerson",
+    [("name", typing.Optional[str]), ("age", np.int32),
+     ("address", typing.Optional[str]), ("aliases", typing.List[str]),
+     ("knows_javascript", bool), ("payload", typing.Optional[bytes]),
+     ("custom_metadata", typing.Mapping[str, int]),
+     ("favorite_time", typing.Optional[Timestamp]),
+     ("one_more_field", typing.Optional[str])])
+
 coders_registry.register_coder(Person, RowCoder)
 
 
@@ -82,6 +91,23 @@ class RowCoderTest(unittest.TestCase):
           Timestamp.from_rfc3339('2020-08-12T15:51:00.032Z'))
   ]
 
+  def test_row_accepts_trailing_zeros_truncated(self):
+    expected_coder = RowCoder(
+        typing_to_runner_api(NullablePerson).row_type.schema)
+    person = NullablePerson(
+        None,
+        np.int32(25),
+        "Westeros", ["Mother of Dragons"],
+        False,
+        None, {"dragons": 3},
+        None,
+        "NotNull")
+    out = expected_coder.encode(person)
+    # 9 fields, 1 null byte, field 0, 5, 7 are null
+    new_payload = bytes([9, 1, 1 | 1 << 5 | 1 << 7]) + out[4:]
+    new_value = expected_coder.decode(new_payload)
+    self.assertEqual(person, new_value)
+
   def test_create_row_coder_from_named_tuple(self):
     expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema)
     real_coder = coders_registry.get_coder(Person)

Reply via email to