SuDewei created FLINK-35324:
-------------------------------

             Summary: Avro format can not perform projection pushdown for 
specific fields
                 Key: FLINK-35324
                 URL: https://issues.apache.org/jira/browse/FLINK-35324
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.17.0
            Reporter: SuDewei


AvroFormatFactory.java#createDecodingFormat would return a 
ProjectableDecodingFormat,which means avro format deserializer could perform 
the projection pushdown. However, it is found in practice that the Avro format 
seems unable to perform projection pushdown for specific fields. 

For example, there are such schema and sample data in Kafka:
{code:java}
-- schema
CREATE TABLE kafka (
   `user_id` BIGINT,
   `name` STRING,
    `timestamp` TIMESTAMP(3) METADATA,
    `event_id` BIGINT,
    `payload` STRING not null
     ) WITH (
     'connector' = 'kafka',
     ...
     )
 
 -- sample data like    
(3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
The data can be successfully deserialized in this way:
{code:java}
Projection physicalProjections = Projection.of( new int[] {0,1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The data would be:
{code:java}
+I(3,name 3,102) {code}
However, when the projection index is replaced with values that do not start 
from 0, the data cannot be successfully parsed, for example:
{code:java}
Projection physicalProjections = Projection.of( new int[] {1,2} );

DataType physicalFormatDataType = 
physicalProjections.project(this.physicalDataType);

(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
    .createRuntimeDecoder(context, this.physicalDataType, 
physicalProjections.toNestedIndexes()); {code}
The exception would be like:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
It seems that Avro format does not support projection pushdown for arbitrary 
fields. Is my understanding correct?

If this is the case, then I think Avro format should not implement the 
ProjectableDecodingFormat interface , since it can only provide very limited 
pushdown capabilities.

This problem may block the connector implementing the projection pushdown 
capability since the connector would determine whether projection pushdown can 
be performed by judging whether the format has implemented the 
ProjectableDecodingFormat interface or not.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to