Francesco Guardiani created FLINK-24776:
-------------------------------------------

             Summary: Clarify DecodingFormat
                 Key: FLINK-24776
                 URL: https://issues.apache.org/jira/browse/FLINK-24776
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Francesco Guardiani


Today the {{org.apache.flink.table.connector.format.DecodingFormat}} interface 
has not clear requirements and it's confusing for implementers. In particular, 
it's unclear whether the format need to support projection push down or not, 
and whether the {{DataType}} provided to {{createRuntimeDecoder}} is projected 
and includes partition keys or not. An example of such misunderstanding is 
shown here: 
https://github.com/apache/flink/blob/991dd0466ff28995a22ded0727ef2a1706d9bddc/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java#L107

The PR https://github.com/apache/flink/pull/17544 partially addresses the 
issue, because it removes the need from BulkFormat implementations to take care 
of partition keys handling. Neverthless, it's still unclear whether formats 
support projections or not and if they support nested projections.

We should refactor {{DecodingFormat}} as follows:

* We document that every {{DecodingFormat}} *MUST* support projections. This is 
already the case for every format we have (see 
https://github.com/apache/flink/pull/17544#issuecomment-953184692). A 
{{DecodingFormat}} *MAY* also support nested projections, and this is signaled 
by a new method {{DecodingFormat#supportsNestedProjection()}}
* Add a new method {{createRuntimeDecoder(DynamicTableSource.Context context, 
DataType physicalDataType, int[][] projections)}} that users should now 
implement. The {{physicalDataType}} in this signature is the physical data type 
from the table schema stripped of metadata columns and partition keys, with 
fields in the order defined by the table schema. The user can compute the final 
type with {{DataType.projectFields(physicalDataType, projections)}}
* Deprecate the old {{createRuntimeDecoder}}
* Default implement the new and old {{createRuntimeDecoder}} to ensure backward 
compatibility.

As alternative, we ([~twalthr] and I) explored the idea that formats might not 
support projection push down, but this is very unlikely and such change 
requires several planner changes, including breaking the interface 
{{SupportsProjectionPushDown}}.

We should also provide a {{RowData}} implementation that takes care of 
projection internally, so the {{DecodingFormat}} implementer that doesn't want 
to support projections can just use this {{RowData}} wrapper like: {{new 
ProjectedRowData(rowDataProducedByFormat, projections)}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to