[
https://issues.apache.org/jira/browse/FLINK-35324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849596#comment-17849596
]
Weijie Guo commented on FLINK-35324:
------------------------------------
Does 1.20 has the same problem?
> Avro format can not perform projection pushdown for specific fields
> -------------------------------------------------------------------
>
> Key: FLINK-35324
> URL: https://issues.apache.org/jira/browse/FLINK-35324
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.17.0
> Reporter: SuDewei
> Priority: Blocker
>
> AvroFormatFactory.java#createDecodingFormat would return a
> ProjectableDecodingFormat,which means avro format deserializer could perform
> the projection pushdown. However, it is found in practice that the Avro
> format seems unable to perform projection pushdown for specific fields.
> For example, there are such schema and sample data in Kafka:
> {code:java}
> -- schema
> CREATE TABLE kafka (
> `user_id` BIGINT,
> `name` STRING,
> `timestamp` TIMESTAMP(3) METADATA,
> `event_id` BIGINT,
> `payload` STRING not null
> ) WITH (
> 'connector' = 'kafka',
> ...
> )
>
> -- sample data like
> (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
> The data can be successfully deserialized in this way:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {0,1,2} );
> DataType physicalFormatDataType =
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
> .createRuntimeDecoder(context, this.physicalDataType,
> physicalProjections.toNestedIndexes()); {code}
> The data would be:
> {code:java}
> +I(3,name 3,102) {code}
> However, when the projection index is replaced with values that do not start
> from 0, the data cannot be successfully deserialized, for example:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {1,2} );
> DataType physicalFormatDataType =
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
> .createRuntimeDecoder(context, this.physicalDataType,
> physicalProjections.toNestedIndexes()); {code}
> The exception would be like:
> {code:java}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
> at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 19 more {code}
> It seems that Avro format does not support projection pushdown for arbitrary
> fields. Is my understanding correct?
> If this is the case, then I think Avro format should not implement the
> ProjectableDecodingFormat interface , since it can only provide very limited
> pushdown capabilities.
> This problem may block the connector implementing the projection pushdown
> capability since the connector would determine whether projection pushdown
> can be performed by judging whether the format has implemented the
> ProjectableDecodingFormat interface or not.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)