Hi Fabian,

Thanks for the response! I'll take a look at the CSVReaderFormat.

Our team is interested in contributing to Parquet. However, our capacity
for the current sprint is fully committed to other workstreams. I'll put
this issue onto the backlog and see how it stacks against our internal
priorities over the next few cycles.
I did a scan for a JIRA issue for this file format restructure and didn't
find anything but do you know of a JIRA issue I can subscribe to for this
issue? Otherwise, I can create an issue for this change with Parquet.

In regards to the "envisioned setup".

> My understanding so far is you have Parquet files with backfill
> data and want to read all files and then continue the reading from
> Kafka. Is that correct?
>
This is correct, the only modification would be that we want the final
datastream type to be DataStream[T], where T is a Scala Case Class. The
user would provide T to the Hybrid Source at time of instantiation. So
pseudocode would look roughly like:

long switchTimestamp = ...; // derive from file input paths
> FileSource<T> fileSource =
> FileSource.forBulkFileFormat(new ParquetColumnarRowInputFormat<T>(),
> Path.fromLocalFile(testDir)).build(); // Swap ParquetColumnarRowInputFormat
> for a Generic ParquetInputFormat
> KafkaSource<T> kafkaSource = KafkaSource.<T>builder()
> .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
> .build();
> HybridSource<T> hybridSource = HybridSource.builder(fileSource)
> .addSource(kafkaSource)
> .build();

DataStream<T> dataStream = env.fromSource(hybridSource, watermarkStrategy,
> name)
>

Let me know if you have any questions!

Thanks,
Ryan van Huuksloot
Data Developer | Data Platform Engineering | Streaming Capabilities
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>


On Mon, Feb 21, 2022 at 3:16 AM Fabian Paul <fp...@apache.org> wrote:

> Hi Ryan,
>
> Thanks for bringing up this topic. Currently, your analysis is
> correct, and reading parquet files outside the Table API is rather
> difficult. The community started an effort in Flink 1.15 to
> restructure some of the formats to make them better applicable to the
> DataStream and Table API. You can have a look a the CSV format
> implementation[1]. Obviously, implementing the Parquet format is more
> complicated since it is more performance-sensitive.
>
> If you are willing to work on it, that would be great. We can also
> assist with the design and offer guidance during the implementation.
>
> One question I'd still like to ask is about your exact envisioned
> setup. My understanding so far is you have Parquet files with backfill
> data and want to read all files and then continue the reading from
> Kafka. Is that correct?
>
> Best
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvReaderFormat.java#L71
>
>
> On Fri, Feb 18, 2022 at 11:22 PM Ryan van Huuksloot
> <ryan.vanhuuksl...@shopify.com> wrote:
> >
> > Hello,
> >
> >
> > Context:
> >
> > We are working on integrating Hybrid Sources with different Sources and
> Sinks. I have been working on a Parquet source that allows users to load
> the FileSource[T] so that the source can be used within Hybrid Sources
> where the HybridSource is of Type[T].
> >
> > The environment is Scala 2.12 and we are using the DataStream API. The
> generic type “T” used in the email would be a Scala case class.
> >
> >
> > Problem:
> >
> > Based on the documentation, it is recommended that you use the
> ParquetColumnarRowInputFormat as an entrypoint to set up the Source. Given
> that ParquetColumnarRowInputFormat hard codes RowData, your other sources
> would then need to be of Type[RowData] to be used in HybridSource - from my
> experience - and you can’t convert FileSource[RowData] -> FileSource[T].
> >
> > An alternative I looked into was extending ParquetVectorizedInputFormat
> but found that the type restrictions were difficult to reconcile.
> >
> >
> > Potential Solution:
> >
> > Create a better AbstractParquetBulkFormat, similar to the
> AbstractAvroBulkFormat added in 1.15. We would be available to contribute
> but want to understand if this is a direction Flink is willing to go before
> putting in the work!
> >
> >
> > Questions:
> >
> >
> > Based on the current implementation of Parquet within Flink, is it
> correct to say that the only entry-point for parquet is
> ParquetColumnarRowInputFormat?
> >
> > Is there any way to convert a FileSource[RowData] -> FileSource[T]?
> >
> > Would the potential solution listed above be an implementation that
> Flink would be interested in integrating?
> >
> > If not, do you have an example of Parquet being used in a HybridSource
> along with a Kafka Source?
> >
> >
> > Thanks!
> > Ryan van Huuksloot
> > Data Developer | Data Platform Engineering | Streaming Capabilities
>

Reply via email to