[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

2022-03-10 Thread Kevin Lam (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17504379#comment-17504379
 ] 

Kevin Lam commented on FLINK-21406:
---

Hi, is there a ticket for someone to implement splitting functionality for 
AvroParquetFileRecordFormat? And if so, is someone working on it? 

> Add AvroParquetFileRecordFormat
> ---
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Chesnay Schepler
>Assignee: Jing Ge
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via 
> the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do 
> that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we 
> just wrap some parquet classes and bridge our FileSystem with their IO 
> abstraction.
> The main goal was to have a format that reads data through our FileSystems, 
> and not work directly against Hadoop to prevent a ClassLoader leak from the 
> S3AFileSystem (threads in a thread pool can keep references to the user 
> classloader).
> According to the user it appears to be working, but it will need some 
> cleanup, ideally support for specific records, support for checkpointing 
> (which should be fairly easy I believe), maybe splitting files (not sure 
> whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements 
> FileRecordFormat {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader createReader(
> Configuration config, Path filePath, long splitOffset, long 
> splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.builder(new 
> InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements 
> FileRecordFormat.Reader {
> private final ParquetReader parquetReader;
> private MyReader(ParquetReader parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends 
> DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOException {
> inputStream.seek(newPos);
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira

[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

2021-09-02 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409285#comment-17409285
 ] 

Arvid Heise commented on FLINK-21406:
-

Thank you very much. I assigned it to you.

> Add AvroParquetFileRecordFormat
> ---
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Chesnay Schepler
>Assignee: Jing Ge
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via 
> the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do 
> that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we 
> just wrap some parquet classes and bridge our FileSystem with their IO 
> abstraction.
> The main goal was to have a format that reads data through our FileSystems, 
> and not work directly against Hadoop to prevent a ClassLoader leak from the 
> S3AFileSystem (threads in a thread pool can keep references to the user 
> classloader).
> According to the user it appears to be working, but it will need some 
> cleanup, ideally support for specific records, support for checkpointing 
> (which should be fairly easy I believe), maybe splitting files (not sure 
> whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements 
> FileRecordFormat {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader createReader(
> Configuration config, Path filePath, long splitOffset, long 
> splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.builder(new 
> InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements 
> FileRecordFormat.Reader {
> private final ParquetReader parquetReader;
> private MyReader(ParquetReader parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends 
> DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOException {
> inputStream.seek(newPos);
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

2021-09-02 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409283#comment-17409283
 ] 

Jing Ge commented on FLINK-21406:
-

I'd like to take this task. [~arvid]

> Add AvroParquetFileRecordFormat
> ---
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Chesnay Schepler
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.15.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via 
> the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do 
> that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we 
> just wrap some parquet classes and bridge our FileSystem with their IO 
> abstraction.
> The main goal was to have a format that reads data through our FileSystems, 
> and not work directly against Hadoop to prevent a ClassLoader leak from the 
> S3AFileSystem (threads in a thread pool can keep references to the user 
> classloader).
> According to the user it appears to be working, but it will need some 
> cleanup, ideally support for specific records, support for checkpointing 
> (which should be fairly easy I believe), maybe splitting files (not sure 
> whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements 
> FileRecordFormat {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader createReader(
> Configuration config, Path filePath, long splitOffset, long 
> splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.builder(new 
> InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements 
> FileRecordFormat.Reader {
> private final ParquetReader parquetReader;
> private MyReader(ParquetReader parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends 
> DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOException {
> inputStream.seek(newPos);
> }
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21406) Add AvroParquetFileRecordFormat

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327329#comment-17327329
 ] 

Flink Jira Bot commented on FLINK-21406:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Add AvroParquetFileRecordFormat
> ---
>
> Key: FLINK-21406
> URL: https://issues.apache.org/jira/browse/FLINK-21406
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile)
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: stale-major
> Fix For: 1.13.0
>
>
> There is currently no easy way to read avro GenericRecords from parquet via 
> the new {{FileSource}}.
> While helping out a user I started writing FileRecordFormat that could do 
> that, but it requires some finalization.
> The implementation is similar to our ParquetAvroWriters class, in that we 
> just wrap some parquet classes and bridge our FileSystem with their IO 
> abstraction.
> The main goal was to have a format that reads data through our FileSystems, 
> and not work directly against Hadoop to prevent a ClassLoader leak from the 
> S3AFileSystem (threads in a thread pool can keep references to the user 
> classloader).
> According to the user it appears to be working, but it will need some 
> cleanup, ideally support for specific records, support for checkpointing 
> (which should be fairly easy I believe), maybe splitting files (not sure 
> whether this works properly with Parquet) and of course tests + documentation.
> {code}
> public class ParquetAvroFileRecordFormat implements 
> FileRecordFormat {
> private final transient Schema schema;
> public ParquetAvroFileRecordFormat(Schema schema) {
> this.schema = schema;
> }
> @Override
> public Reader createReader(
> Configuration config, Path filePath, long splitOffset, long 
> splitLength)
> throws IOException {
> final FileSystem fs = filePath.getFileSystem();
> final FileStatus status = fs.getFileStatus(filePath);
> final FSDataInputStream in = fs.open(filePath);
> return new MyReader(
> AvroParquetReader.builder(new 
> InputFileWrapper(in, status.getLen()))
> .withDataModel(GenericData.get())
> .build());
> }
> @Override
> public Reader restoreReader(
> Configuration config,
> Path filePath,
> long restoredOffset,
> long splitOffset,
> long splitLength) {
> // not called if checkpointing isn't used
> return null;
> }
> @Override
> public boolean isSplittable() {
> // let's not worry about this for now
> return false;
> }
> @Override
> public TypeInformation getProducedType() {
> return new GenericRecordAvroTypeInfo(schema);
> }
> private static class MyReader implements 
> FileRecordFormat.Reader {
> private final ParquetReader parquetReader;
> private MyReader(ParquetReader parquetReader) {
> this.parquetReader = parquetReader;
> }
> @Nullable
> @Override
> public GenericRecord read() throws IOException {
> return parquetReader.read();
> }
> @Override
> public void close() throws IOException {
> parquetReader.close();
> }
> }
> private static class InputFileWrapper implements InputFile {
> private final FSDataInputStream inputStream;
> private final long length;
> private InputFileWrapper(FSDataInputStream inputStream, long length) {
> this.inputStream = inputStream;
> this.length = length;
> }
> @Override
> public long getLength() {
> return length;
> }
> @Override
> public SeekableInputStream newStream() {
> return new SeekableInputStreamAdapter(inputStream);
> }
> }
> private static class SeekableInputStreamAdapter extends 
> DelegatingSeekableInputStream {
> private final FSDataInputStream inputStream;
> private SeekableInputStreamAdapter(FSDataInputStream inputStream) {
> super(inputStream);
> this.inputStream = inputStream;
> }
> @Override
> public long getPos() throws IOException {
> return inputStream.getPos();
> }
> @Override
> public void seek(long newPos) throws IOExc