hudi-bot opened a new issue, #15397:
URL: https://github.com/apache/hudi/issues/15397

   When reading a MOR table in flink, we encountered an exception from flink 
runtime ( as shown in image1), which complained the table source should not 
emit a retract record.
   
   !image 1.png!
   
   I think here is the cause, in HoodieTableSource:
   {code:java}
   @Override
   public ChangelogMode getChangelogMode() {
     // when read as streaming and changelog mode is enabled, emit as FULL mode;
     // when all the changes are compacted or read as batch, emit as INSERT 
mode.
     return OptionsResolver.emitChangelog(conf) ? ChangelogModes.FULL : 
ChangelogMode.insertOnly();
   } {code}
   {code:java}
   private InputFormat<RowData, ?> getStreamInputFormat() { 
   ...
   if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) { 
     final HoodieTableType tableType = 
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); 
     boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; 
     return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, 
rowDataType, Collections.emptyList(), emitDelete); }
   ...
    }
   {code}
   With these options:
   
   {{'table.type'}} {{= }}{{'MERGE_ON_READ'}}
   
   {{'read.streaming.enabled'}} {{= }}{{'true'}}
   
   {{The HoodieTableSource}} annouces it has only INSERT changelog, 
   
   but MergeOnReadInputFormat will emit delete.
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-4733
   - Type: Bug
   - Fix version(s):
     - 1.1.1
   - Attachment(s):
     - 29/Aug/22 08:47;nonggia;image 
1.png;https://issues.apache.org/jira/secure/attachment/13048700/image+1.png


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to