SuDewei created FLINK-35324:
-------------------------------
Summary: Avro format can not perform projection pushdown for
specific fields
Key: FLINK-35324
URL: https://issues.apache.org/jira/browse/FLINK-35324
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.17.0
Reporter: SuDewei
AvroFormatFactory.java#createDecodingFormat would return a
ProjectableDecodingFormat,which means avro format deserializer could perform
the projection pushdown. However, it is found in practice that the Avro format
seems unable to perform projection pushdown for specific fields.
For example, there are such schema and sample data in Kafka:
{code:java}
-- schema
CREATE TABLE kafka (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP(3) METADATA,
`event_id` BIGINT,
`payload` STRING not null
) WITH (
'connector' = 'kafka',
...
)
-- sample data like
(3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
The data can be successfully deserialized in this way:
{code:java}
Projection physicalProjections = Projection.of( new int[] {0,1,2} );
DataType physicalFormatDataType =
physicalProjections.project(this.physicalDataType);
(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
.createRuntimeDecoder(context, this.physicalDataType,
physicalProjections.toNestedIndexes()); {code}
The data would be:
{code:java}
+I(3,name 3,102) {code}
However, when the projection index is replaced with values that do not start
from 0, the data cannot be successfully parsed, for example:
{code:java}
Projection physicalProjections = Projection.of( new int[] {1,2} );
DataType physicalFormatDataType =
physicalProjections.project(this.physicalDataType);
(DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
.createRuntimeDecoder(context, this.physicalDataType,
physicalProjections.toNestedIndexes()); {code}
The exception would be like:
{code:java}
Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 19 more {code}
It seems that Avro format does not support projection pushdown for arbitrary
fields. Is my understanding correct?
If this is the case, then I think Avro format should not implement the
ProjectableDecodingFormat interface , since it can only provide very limited
pushdown capabilities.
This problem may block the connector implementing the projection pushdown
capability since the connector would determine whether projection pushdown can
be performed by judging whether the format has implemented the
ProjectableDecodingFormat interface or not.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)