Chesnay Schepler created FLINK-21406:
----------------------------------------

             Summary: Add AvroParquetFileRecordFormat
                 Key: FLINK-21406
                 URL: https://issues.apache.org/jira/browse/FLINK-21406
             Project: Flink
          Issue Type: New Feature
          Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
            Reporter: Chesnay Schepler
             Fix For: 1.13.0


There is currently no easy way to read avro GenericRecords from parquet via the 
new {{FileSource}}.
While helping out a user I started writing FileRecordFormat that could do that, 
but it requires some finalization.

The implementation is similar to our ParquetAvroWriters class, in that we just 
wrap some parquet classes and bridge our FileSystem with their IO abstraction.

The main goal was to have a format that reads data through our FileSystems, and 
not work directly against Hadoop to prevent a ClassLoader leak from the 
S3AFileSystem (threads in a thread pool can keep references to the user 
classloader).

According to the user it appears to be working, but it will need some cleanup, 
ideally support for specific records, support for checkpointing (which should 
be fairly easy I believe), maybe splitting files (not sure whether this works 
properly with Parquet) and of course tests + documentation.

{code}
public class ParquetAvroFileRecordFormat implements 
FileRecordFormat<GenericRecord> {
    private final transient Schema schema;

    public ParquetAvroFileRecordFormat(Schema schema) {
        this.schema = schema;
    }

    @Override
    public Reader<GenericRecord> createReader(
            Configuration config, Path filePath, long splitOffset, long 
splitLength)
            throws IOException {

        final FileSystem fs = filePath.getFileSystem();
        final FileStatus status = fs.getFileStatus(filePath);
        final FSDataInputStream in = fs.open(filePath);

        return new MyReader(
                AvroParquetReader.<GenericRecord>builder(new 
InputFileWrapper(in, status.getLen()))
                        .withDataModel(GenericData.get())
                        .build());
    }

    @Override
    public Reader<GenericRecord> restoreReader(
            Configuration config,
            Path filePath,
            long restoredOffset,
            long splitOffset,
            long splitLength) {
        // not called if checkpointing isn't used
        return null;
    }

    @Override
    public boolean isSplittable() {
        // let's not worry about this for now
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return new GenericRecordAvroTypeInfo(schema);
    }

    private static class MyReader implements 
FileRecordFormat.Reader<GenericRecord> {

        private final ParquetReader<GenericRecord> parquetReader;

        private MyReader(ParquetReader<GenericRecord> parquetReader) {
            this.parquetReader = parquetReader;
        }

        @Nullable
        @Override
        public GenericRecord read() throws IOException {
            return parquetReader.read();
        }

        @Override
        public void close() throws IOException {
            parquetReader.close();
        }
    }

    private static class InputFileWrapper implements InputFile {

        private final FSDataInputStream inputStream;
        private final long length;

        private InputFileWrapper(FSDataInputStream inputStream, long length) {
            this.inputStream = inputStream;
            this.length = length;
        }

        @Override
        public long getLength() {
            return length;
        }

        @Override
        public SeekableInputStream newStream() {
            return new SeekableInputStreamAdapter(inputStream);
        }
    }

    private static class SeekableInputStreamAdapter extends 
DelegatingSeekableInputStream {

        private final FSDataInputStream inputStream;

        private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
            super(inputStream);
            this.inputStream = inputStream;
        }

        @Override
        public long getPos() throws IOException {
            return inputStream.getPos();
        }

        @Override
        public void seek(long newPos) throws IOException {
            inputStream.seek(newPos);
        }
    }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to