WeiZhong94 commented on a change in pull request #13462:
URL: https://github.com/apache/flink/pull/13462#discussion_r494714656



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -46,8 +46,7 @@ def __init__(self, field_coders):
         self._remaining_bits_num = (self._field_count + ROW_KIND_BIT_SIZE) % 8
         self.null_mask_search_table = self.generate_null_mask_search_table()
         self.null_byte_search_table = (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 
0x02, 0x01)
-        self.row_kind_search_table = \
-            [i << (8 - ROW_KIND_BIT_SIZE) for i in range(2 ** 
ROW_KIND_BIT_SIZE)]
+        self.row_kind_search_table = [0x00, 0x80, 0x40, 0xC0]

Review comment:
       The '[i << (8 - ROW_KIND_BIT_SIZE) for i in range(2 ** 
ROW_KIND_BIT_SIZE)]' would not generate the correct row_kind_search_table, so 
assign the correct table directly.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/arrow/ArrowUtils.java
##########
@@ -749,7 +749,7 @@ private static boolean isAppendOnlyTable(Table table) 
throws Exception {
                                        
OutputConversionModifyOperation.UpdateMode.APPEND);
                                
tableEnv.getPlanner().translate(Collections.singletonList(modifyOperation));
                        } catch (Throwable t) {
-                               if (t.getMessage().contains("doesn't support 
consuming update changes") ||

Review comment:
       This change is used to match the error message "xxx doesn't support 
consuming update and delete changes xxxxxx"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to