WeichengJi created FLINK-33794:
----------------------------------
Summary: After Flink Avro deserialization fails, subsequent
correct data cannot be deserialized correctly
Key: FLINK-33794
URL: https://issues.apache.org/jira/browse/FLINK-33794
Project: Flink
Issue Type: Improvement
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.18.0, 1.17.0
Environment: Flink 1.17.1 & Flink 1.18.0
Reporter: WeichengJi
Excuse me, this is my first time submitting Flink Jira.
I found that when using the official AvroDeserializationScheme for avro
deserialization, when deserialization fails, the next correct data cannot be
deserialized correctly because the previous incorrect data caused the
inputStream in the decoder to not be fully read and closed. I think we should
modify the deserialize method to the following code block。
The version of Flink I am using is 1.17.1
{code:java}
/** Avro decoder that decodes binary data. */
private transient BinaryDecoder decoder;
@Override
public T deserialize(@Nullable byte[] message) throws IOException {
if (message == null) {
return null;
}
// read record
checkAvroInitialized();
checkAvroDecoder();
inputStream.setBuffer(message);
Schema readerSchema = getReaderSchema();
GenericDatumReader<T> datumReader = getDatumReader();
datumReader.setSchema(readerSchema);
return datumReader.read(null, decoder);
}
void checkAvroDecoder() throws IOException {
if (!decoder.isEnd()) {
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)