[
https://issues.apache.org/jira/browse/FLINK-30548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
heyu dou updated FLINK-30548:
-----------------------------
Description:
When the avro schema changes, if the downstream uses the old schema to read the
data generated by the new schema.
The buffer pos of AvroDeserializationSchema.decoder will not be reset.
This will result in misaligned reads.
Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos.
The next read should start from the latest pos.
[https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]
This line should be changed to :
{code:java}
this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
{code}
The following code can reproduce the problem
{code:java}
public class AvroChangeTest {
private static AvroDeserializationSchema<AvroOld> avroSchema =
AvroDeserializationSchema.forSpecific(AvroOld.class);
@Test
public void testWrite() throws IOException {
DatumWriter<AvroNew> writer = new
SpecificDatumWriter<AvroNew>(AvroNew.getClassSchema());
File file = new File("avro_test.data");
if (file.exists()) {
file.delete();
}
for (int i = 0; i < 10; i++) { // avro serialization
AvroNew taInfo = new AvroNew();
taInfo.setCreateDate("2023-01-03");
taInfo.setAdUserId(i);
taInfo.setClickId("2" + i);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
null);
writer.write(taInfo, encoder);
encoder.flush();
out.close();
byte[] data = out.toByteArray();
System.out.println(data.length + "\t" + taInfo.toString());
FileUtils.writeStringToFile(file, Base64.encodeBase64String(data) +
"\n", true);
}
}
@Test
public void testRead() throws IOException { // avro deserialization
File file = new File("avro_test.data");
List<String> lines = FileUtils.readLines(file);
for (String line : lines) {
byte[] data = Base64.decodeBase64(line);
AvroOld old = avroSchema.deserialize(data);
System.out.println(old.toString());
}
}
}
{code}
{code:java}
{
"namespace": "com.qihoo.dw.model.avroModel",
"type": "record",
"name": "AvroOld",
"fields": [
{
"name": "create_date",
"type": ["string","null"],
"default": "",
"doc": "事件时间(yyyy-MM-dd)"
},
{
"name": "ad_user_id",
"type": "long",
"default": 0,
"doc": "广告主ID"
}
]
} {code}
{code:java}
{
"namespace": "com.qihoo.dw.model.avroModel",
"type": "record",
"name": "AvroNew",
"fields": [
{
"name": "create_date",
"type": ["string","null"],
"default": "",
"doc": "事件时间(yyyy-MM-dd)"
},
{
"name": "ad_user_id",
"type": "long",
"default": 0,
"doc": "广告主ID"
},
{
"name": "click_id",
"type": "string",
"default": ""
}
]
} {code}
was:
When the avro schema changes, if the downstream uses the old schema to read the
data generated by the new schema.
The buffer pos of AvroDeserializationSchema.decoder will not be reset.
This will result in misaligned reads.
Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos.
The next read should start from the latest pos.
[https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]
This line should be changed to :
{code:java}
this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
{code}
> In some cases AvroDeserializationSchema buffer is not reset
> -----------------------------------------------------------
>
> Key: FLINK-30548
> URL: https://issues.apache.org/jira/browse/FLINK-30548
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Reporter: heyu dou
> Priority: Major
>
> When the avro schema changes, if the downstream uses the old schema to read
> the data generated by the new schema.
> The buffer pos of AvroDeserializationSchema.decoder will not be reset.
> This will result in misaligned reads.
> Because AvroDeserializationSchema.decoder is reuse and not reset buffer pos.
> The next read should start from the latest pos.
> [https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165]
> This line should be changed to :
> {code:java}
> this.decoder = DecoderFactory.get().directBinaryDecoder(inputStream, null);
> {code}
>
> The following code can reproduce the problem
> {code:java}
> public class AvroChangeTest {
> private static AvroDeserializationSchema<AvroOld> avroSchema =
> AvroDeserializationSchema.forSpecific(AvroOld.class);
> @Test
> public void testWrite() throws IOException {
> DatumWriter<AvroNew> writer = new
> SpecificDatumWriter<AvroNew>(AvroNew.getClassSchema());
> File file = new File("avro_test.data");
> if (file.exists()) {
> file.delete();
> }
> for (int i = 0; i < 10; i++) { // avro serialization
> AvroNew taInfo = new AvroNew();
> taInfo.setCreateDate("2023-01-03");
> taInfo.setAdUserId(i);
> taInfo.setClickId("2" + i);
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
> null);
> writer.write(taInfo, encoder);
> encoder.flush();
> out.close();
> byte[] data = out.toByteArray();
> System.out.println(data.length + "\t" + taInfo.toString());
> FileUtils.writeStringToFile(file, Base64.encodeBase64String(data)
> + "\n", true);
> }
> }
> @Test
> public void testRead() throws IOException { // avro deserialization
> File file = new File("avro_test.data");
> List<String> lines = FileUtils.readLines(file);
> for (String line : lines) {
> byte[] data = Base64.decodeBase64(line);
> AvroOld old = avroSchema.deserialize(data);
> System.out.println(old.toString());
> }
> }
> }
> {code}
> {code:java}
> {
> "namespace": "com.qihoo.dw.model.avroModel",
> "type": "record",
> "name": "AvroOld",
> "fields": [
> {
> "name": "create_date",
> "type": ["string","null"],
> "default": "",
> "doc": "事件时间(yyyy-MM-dd)"
> },
> {
> "name": "ad_user_id",
> "type": "long",
> "default": 0,
> "doc": "广告主ID"
> }
> ]
> } {code}
> {code:java}
> {
> "namespace": "com.qihoo.dw.model.avroModel",
> "type": "record",
> "name": "AvroNew",
> "fields": [
> {
> "name": "create_date",
> "type": ["string","null"],
> "default": "",
> "doc": "事件时间(yyyy-MM-dd)"
> },
> {
> "name": "ad_user_id",
> "type": "long",
> "default": 0,
> "doc": "广告主ID"
> },
> {
> "name": "click_id",
> "type": "string",
> "default": ""
> }
> ]
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)