This is an automated email from the ASF dual-hosted git repository. bhulette pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cb5913a [BEAM-13081] Fixes a compatible issue of decoding null-value bitmap between JVM coder and Python Coder (#15829) cb5913a is described below commit cb5913aa51243ed3fedc6030ad33bf2d52f52bc8 Author: Jiayang Wu <530081...@qq.com> AuthorDate: Thu Nov 4 13:51:43 2021 -0700 [BEAM-13081] Fixes a compatible issue of decoding null-value bitmap between JVM coder and Python Coder (#15829) * tests passed * make one line shorter * reformat * reformat again * reformat for the last time Co-authored-by: Jiayang Wu <jiaya...@twitter.com> --- sdks/python/apache_beam/coders/row_coder.py | 4 +++- sdks/python/apache_beam/coders/row_coder_test.py | 26 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index ec3778d..20fa867 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -198,7 +198,9 @@ class RowCoderImpl(StreamCoderImpl): words.frombytes(self.NULL_MARKER_CODER.decode_from_stream(in_stream, True)) if words: - nulls = ((words[i // 8] >> (i % 8)) & 0x01 for i in range(nvals)) + nulls = ( + 0 if i // 8 >= len(words) else ((words[i // 8] >> (i % 8)) & 0x01) + for i in range(nvals)) else: nulls = itertools.repeat(False, nvals) diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py index 331f824..2fdf7f8 100644 --- a/sdks/python/apache_beam/coders/row_coder_test.py +++ b/sdks/python/apache_beam/coders/row_coder_test.py @@ -47,6 +47,15 @@ Person = typing.NamedTuple( ("favorite_time", Timestamp), ]) +NullablePerson = typing.NamedTuple( + "NullablePerson", + [("name", typing.Optional[str]), ("age", np.int32), + ("address", typing.Optional[str]), ("aliases", typing.List[str]), + ("knows_javascript", bool), ("payload", typing.Optional[bytes]), + ("custom_metadata", typing.Mapping[str, int]), + ("favorite_time", typing.Optional[Timestamp]), + ("one_more_field", typing.Optional[str])]) + coders_registry.register_coder(Person, RowCoder) @@ -82,6 +91,23 @@ class RowCoderTest(unittest.TestCase): Timestamp.from_rfc3339('2020-08-12T15:51:00.032Z')) ] + def test_row_accepts_trailing_zeros_truncated(self): + expected_coder = RowCoder( + typing_to_runner_api(NullablePerson).row_type.schema) + person = NullablePerson( + None, + np.int32(25), + "Westeros", ["Mother of Dragons"], + False, + None, {"dragons": 3}, + None, + "NotNull") + out = expected_coder.encode(person) + # 9 fields, 1 null byte, field 0, 5, 7 are null + new_payload = bytes([9, 1, 1 | 1 << 5 | 1 << 7]) + out[4:] + new_value = expected_coder.decode(new_payload) + self.assertEqual(person, new_value) + def test_create_row_coder_from_named_tuple(self): expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema) real_coder = coders_registry.get_coder(Person)