Oscar Westra van Holthe - Kind created AVRO-4006:
----------------------------------------------------
Summary: [Java] DataFileReader does not correctly identify last
sync marker when reading/skipping blocks
Key: AVRO-4006
URL: https://issues.apache.org/jira/browse/AVRO-4006
Project: Apache Avro
Issue Type: Bug
Components: java
Affects Versions: 1.11.3
Reporter: Oscar Westra van Holthe - Kind
The following code demonstrates the problem:
{code:java}
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import java.io.File;
import java.io.IOException;
public class AvroTest {
public static void main(String[] args) throws IOException {
File avroFile = new File("test.avro");
GenericData model = GenericData.get();
Schema simple =
SchemaBuilder.record("TestRecord").fields().requiredString("text").endRecord();
Schema.Field textField = simple.getField("text");
try (DataFileWriter<Object> writer = new DataFileWriter<>(new
GenericDatumWriter<>(null, model)).create(simple, avroFile)) {
for (int i = 1; i <= 1000; i++) {
Object record = model.newRecord(null, simple);
model.setField(record, textField.name(), textField.pos(), "i =
" + i);
writer.append(record);
if (i % 100 == 0) {
long syncPos = writer.sync();
System.out.printf("Synced %d records; file position %d%n",
i, syncPos);
}
}
}
IndexedRecord result;
DatumReader<IndexedRecord> datumReader = new
GenericDatumReader<>(simple, simple, model);
try (SeekableFileInput sfi = new SeekableFileInput(avroFile);
MyDataFileReader<IndexedRecord> reader = new
MyDataFileReader<>(sfi, datumReader)) {
// Find the start of the last block reading the entire file,
WITHOUT decoding any records.
// Note that this does decompress the data, but that's so fast
these days that it hardly affects reading speed.
long lastSyncPos = reader.previousSync();
while (reader.hasNext()) {
lastSyncPos = reader.previousSync();
System.out.printf("Sync marker at %d%n", lastSyncPos);
// Mark the block as read, so hasNext() will read the next block
reader.nextBlock();
}
System.out.printf("Sync marker at %d%n", reader.previousSync());
reader.seek(lastSyncPos);
IndexedRecord lastRecord1 = null;
int decoded = 0;
while (reader.hasNext()) {
lastRecord1 = reader.next(lastRecord1);
decoded++;
}
System.out.printf("Decoded %d records%n", decoded);
result = lastRecord1;
}
Object lastRecord = result;
System.out.printf("Last record: %s%n", lastRecord);
}
private static class MyDataFileReader<T> extends DataFileReader<T> {
public MyDataFileReader(SeekableFileInput sfi, DatumReader<T>
datumReader) throws IOException {
super(sfi, datumReader);
}
@Override
public void blockFinished() throws IOException {
super.blockFinished();
}
}
}
{code}
The output:
{noformat}
Synced 100 records; file position 828
Synced 200 records; file position 1648
Synced 300 records; file position 2468
Synced 400 records; file position 3288
Synced 500 records; file position 4108
Synced 600 records; file position 4928
Synced 700 records; file position 5748
Synced 800 records; file position 6568
Synced 900 records; file position 7388
Synced 1000 records; file position 8209
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Sync marker at 116
Decoded 1000 records
Last record: {"text": "i = 1000"}
{noformat}
In the expected output, the detected sync markers should progress, and only 100
records should be decoded:
{noformat}
Synced 100 records; file position 828
Synced 200 records; file position 1648
Synced 300 records; file position 2468
Synced 400 records; file position 3288
Synced 500 records; file position 4108
Synced 600 records; file position 4928
Synced 700 records; file position 5748
Synced 800 records; file position 6568
Synced 900 records; file position 7388
Synced 1000 records; file position 8209
Sync marker at 116
Sync marker at 828
Sync marker at 1648
Sync marker at 2468
Sync marker at 3288
Sync marker at 4108
Sync marker at 4928
Sync marker at 5748
Sync marker at 6568
Sync marker at 7388
Sync marker at 8209
Decoded 100 records
Last record: {"text": "i = 1000"}
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)