Hello,

Context:

We are working on integrating Hybrid Sources with different Sources and
Sinks. I have been working on a Parquet source that allows users to load
the FileSource[T] so that the source can be used within Hybrid Sources
where the HybridSource is of Type[T].

The environment is Scala 2.12 and we are using the DataStream API. The
generic type “T” used in the email would be a Scala case class.

Problem:

Based on the documentation
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/>,
it is recommended that you use the ParquetColumnarRowInputFormat
<https://github.com/apache/flink/blob/a060302379a7a53fe10f699625e36245e6a90d50/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormat.java>
as an entrypoint to set up the Source. Given that
ParquetColumnarRowInputFormat hard codes RowData
<https://github.com/apache/flink/blob/a060302379a7a53fe10f699625e36245e6a90d50/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormat.java#L48>,
your other sources would then need to be of Type[RowData] to be used in
HybridSource - from my experience - and you can’t convert
FileSource[RowData] -> FileSource[T].

An alternative I looked into was extending ParquetVectorizedInputFormat
<https://github.com/apache/flink/blob/a060302379a7a53fe10f699625e36245e6a90d50/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java>
but found that the type restrictions were difficult to reconcile.

Potential Solution:

Create a better AbstractParquetBulkFormat, similar to the
AbstractAvroBulkFormat
<https://github.com/apache/flink/blob/a060302379a7a53fe10f699625e36245e6a90d50/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AbstractAvroBulkFormat.java>
added in 1.15. We would be available to contribute but want to understand
if this is a direction Flink is willing to go before putting in the work!

Questions:


   1.

   Based on the current implementation of Parquet within Flink, is it
   correct to say that the only entry-point for parquet is
   ParquetColumnarRowInputFormat
   
<https://github.com/apache/flink/blob/a060302379a7a53fe10f699625e36245e6a90d50/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormat.java>
   ?
   1.

      Is there any way to convert a FileSource[RowData] -> FileSource[T]?
      2.

   Would the potential solution listed above be an implementation that
   Flink would be interested in integrating?
   1.

      If not, do you have an example of Parquet being used in a
      HybridSource along with a Kafka Source?


Thanks!
Ryan van Huuksloot
Data Developer | Data Platform Engineering | Streaming Capabilities
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>

Reply via email to