HuangXingBo commented on a change in pull request #13371:
URL: https://github.com/apache/flink/pull/13371#discussion_r488448820



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -75,62 +82,106 @@ def decode_from_stream(self, in_stream, nested):
             in_stream.read_var_int64()
             yield self._decode_one_row_from_stream(in_stream, nested)
 
-    def _decode_one_row_from_stream(self, in_stream: create_InputStream, 
nested: bool) -> List:
-        null_mask = self._read_null_mask(in_stream)
-        return [None if null_mask[idx] else 
self._field_coders[idx].decode_from_stream(
-            in_stream, nested) for idx in range(0, self._field_count)]
+    def _decode_one_row_from_stream(
+            self, in_stream: create_InputStream, nested: bool) -> List:
+        mask = self._read_mask(in_stream)
+        # ignore the row kind value as it is unnecessary for stateless 
operation
+        return [None if mask[idx + ROW_KIND_BIT_SIZE] else
+                self._field_coders[idx].decode_from_stream(
+                    in_stream, nested) for idx in range(0, self._field_count)]
 
-    def _write_null_mask(self, value, out_stream):
+    def _write_mask(self, value, out_stream, row_kind_value=0):
         field_pos = 0
-        null_byte_search_table = self.null_byte_search_table
+        mask_byte_search_table = self.mask_byte_search_table
         remaining_bits_num = self._remaining_bits_num
-        for _ in range(self._leading_complete_bytes_num):
+
+        # first byte contains the row kind bits
+        b = self.row_kind_byte_table[row_kind_value]
+        for i in range(0, 8 - ROW_KIND_BIT_SIZE):
+            if field_pos + i < len(value) and value[field_pos + i] is None:
+                b |= mask_byte_search_table[i + ROW_KIND_BIT_SIZE]
+        field_pos += 8 - ROW_KIND_BIT_SIZE
+        out_stream.write_byte(b)
+
+        for _ in range(1, self._leading_complete_bytes_num):
             b = 0x00
             for i in range(0, 8):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             field_pos += 8
             out_stream.write_byte(b)
 
-        if remaining_bits_num:
+        if self._leading_complete_bytes_num >= 1 and remaining_bits_num:
             b = 0x00
             for i in range(remaining_bits_num):
                 if value[field_pos + i] is None:
-                    b |= null_byte_search_table[i]
+                    b |= mask_byte_search_table[i]
             out_stream.write_byte(b)
 
-    def _read_null_mask(self, in_stream):
-        null_mask = []
-        null_mask_search_table = self.null_mask_search_table
+    def _read_mask(self, in_stream):
+        mask = []
+        mask_search_table = self.mask_search_table
         remaining_bits_num = self._remaining_bits_num
         for _ in range(self._leading_complete_bytes_num):
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b])
+            mask.extend(mask_search_table[b])
 
         if remaining_bits_num:
             b = in_stream.read_byte()
-            null_mask.extend(null_mask_search_table[b][0:remaining_bits_num])
-        return null_mask
+            mask.extend(mask_search_table[b][0:remaining_bits_num])
+        return mask
 
     def __repr__(self):
         return 'FlattenRowCoderImpl[%s]' % ', '.join(str(c) for c in 
self._field_coders)
 
 
-class RowCoderImpl(FlattenRowCoderImpl):
+class FlattenRowCoderWithRowKindImpl(FlattenRowCoderImpl):

Review comment:
       Why we need to introduce `FlattenRowCoderWithRowKindImpl `? What about 
moving the override method `_decode_one_row_from_stream ` to `RowCoderImpl` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to