[ 
https://issues.apache.org/jira/browse/FLINK-24776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-24776:
---------------------------------
    Summary: Clarify semantics of DecodingFormat and its data type  (was: 
Clarify DecodingFormat)

> Clarify semantics of DecodingFormat and its data type
> -----------------------------------------------------
>
>                 Key: FLINK-24776
>                 URL: https://issues.apache.org/jira/browse/FLINK-24776
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Francesco Guardiani
>            Priority: Major
>
> Today the {{org.apache.flink.table.connector.format.DecodingFormat}} 
> interface has not clear requirements and it's confusing for implementers. In 
> particular, it's unclear whether the format need to support projection push 
> down or not, and whether the {{DataType}} provided to 
> {{createRuntimeDecoder}} is projected and includes partition keys or not. An 
> example of such misunderstanding is shown here: 
> https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107
> The PR https://github.com/apache/flink/pull/17544 partially addresses the 
> issue, because it removes the need from BulkFormat implementations to take 
> care of partition keys handling. Neverthless, it's still unclear whether 
> formats support projections or not and if they support nested projections.
> We should refactor {{DecodingFormat}} as follows:
> * We document that every {{DecodingFormat}} *MUST* support projections. This 
> is already the case for every format we have (see 
> https://github.com/apache/flink/pull/17544#issuecomment-953184692). A 
> {{DecodingFormat}} *MAY* also support nested projections, and this is 
> signaled by a new method {{DecodingFormat#supportsNestedProjection()}}
> * Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, 
> DataType physicalDataType, int[][] projections)}} that users should now 
> implement. The {{physicalDataType}} in this signature is the physical data 
> type from the table schema stripped of metadata columns and partition keys, 
> with fields in the order defined by the table schema. The user can compute 
> the final type with {{DataType.projectFields(physicalDataType, projections)}}
> * Deprecate the old {{createRuntimeDecoder}}
> * Default implement the new and old {{createRuntimeDecoder}} to ensure 
> backward compatibility.
> As alternative, we ([~twalthr] and I) explored the idea that formats might 
> not support projection push down, but this is very unlikely and such change 
> requires several planner changes, including breaking the interface 
> {{SupportsProjectionPushDown}}.
> We should also provide a {{RowData}} implementation that takes care of 
> projection internally, so the {{DecodingFormat}} implementer that doesn't 
> want to support projections can just use this {{RowData}} wrapper like: {{new 
> ProjectedRowData(rowDataProducedByFormat, projections)}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to