[ https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409283#comment-17409283 ]
Jing Ge commented on FLINK-21406: --------------------------------- I'd like to take this task. [~arvid] > Add AvroParquetFileRecordFormat > ------------------------------- > > Key: FLINK-21406 > URL: https://issues.apache.org/jira/browse/FLINK-21406 > Project: Flink > Issue Type: New Feature > Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, > SequenceFile) > Reporter: Chesnay Schepler > Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.15.0 > > > There is currently no easy way to read avro GenericRecords from parquet via > the new {{FileSource}}. > While helping out a user I started writing FileRecordFormat that could do > that, but it requires some finalization. > The implementation is similar to our ParquetAvroWriters class, in that we > just wrap some parquet classes and bridge our FileSystem with their IO > abstraction. > The main goal was to have a format that reads data through our FileSystems, > and not work directly against Hadoop to prevent a ClassLoader leak from the > S3AFileSystem (threads in a thread pool can keep references to the user > classloader). > According to the user it appears to be working, but it will need some > cleanup, ideally support for specific records, support for checkpointing > (which should be fairly easy I believe), maybe splitting files (not sure > whether this works properly with Parquet) and of course tests + documentation. > {code} > public class ParquetAvroFileRecordFormat implements > FileRecordFormat<GenericRecord> { > private final transient Schema schema; > public ParquetAvroFileRecordFormat(Schema schema) { > this.schema = schema; > } > @Override > public Reader<GenericRecord> createReader( > Configuration config, Path filePath, long splitOffset, long > splitLength) > throws IOException { > final FileSystem fs = filePath.getFileSystem(); > final FileStatus status = fs.getFileStatus(filePath); > final FSDataInputStream in = fs.open(filePath); > return new MyReader( > AvroParquetReader.<GenericRecord>builder(new > InputFileWrapper(in, status.getLen())) > .withDataModel(GenericData.get()) > .build()); > } > @Override > public Reader<GenericRecord> restoreReader( > Configuration config, > Path filePath, > long restoredOffset, > long splitOffset, > long splitLength) { > // not called if checkpointing isn't used > return null; > } > @Override > public boolean isSplittable() { > // let's not worry about this for now > return false; > } > @Override > public TypeInformation<GenericRecord> getProducedType() { > return new GenericRecordAvroTypeInfo(schema); > } > private static class MyReader implements > FileRecordFormat.Reader<GenericRecord> { > private final ParquetReader<GenericRecord> parquetReader; > private MyReader(ParquetReader<GenericRecord> parquetReader) { > this.parquetReader = parquetReader; > } > @Nullable > @Override > public GenericRecord read() throws IOException { > return parquetReader.read(); > } > @Override > public void close() throws IOException { > parquetReader.close(); > } > } > private static class InputFileWrapper implements InputFile { > private final FSDataInputStream inputStream; > private final long length; > private InputFileWrapper(FSDataInputStream inputStream, long length) { > this.inputStream = inputStream; > this.length = length; > } > @Override > public long getLength() { > return length; > } > @Override > public SeekableInputStream newStream() { > return new SeekableInputStreamAdapter(inputStream); > } > } > private static class SeekableInputStreamAdapter extends > DelegatingSeekableInputStream { > private final FSDataInputStream inputStream; > private SeekableInputStreamAdapter(FSDataInputStream inputStream) { > super(inputStream); > this.inputStream = inputStream; > } > @Override > public long getPos() throws IOException { > return inputStream.getPos(); > } > @Override > public void seek(long newPos) throws IOException { > inputStream.seek(newPos); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)