Stephan Ewen created FLINK-19161:
------------------------------------
Summary: Port File Sources to FLIP-27 API
Key: FLINK-19161
URL: https://issues.apache.org/jira/browse/FLINK-19161
Project: Flink
Issue Type: Sub-task
Components: Connectors / FileSystem
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 1.12.0
Porting the File sources to the FLIP-27 API means combining the
- FileInputFormat from the DataSet Batch API
- The Monitoring File Source from the DataStream API.
The two currently share the same reader code already and partial enumeration
code.
*Structure*
The new File Source will have three components:
- File enumerators that discover the files.
- File split assigners that decide which reader gets what split
- File Reader Formats, which deal with the decoding.
The main difference between the Bounded (Batch) version and the unbounded
(Streaming) version is that the streaming version repeatedly invokes the file
enumerator to search for new files.
*Checkpointing Enumerators*
The enumerators need to checkpoint the not-yet-assigned splits, plus, if they
are in continuous discovery mode (streaming) the paths / timestamps already
processed.
*Checkpointing Readers*
The new File Source needs to ensure that every reader can be checkpointed.
Some readers may be able to expose the position in the input file that
corresponds to the latest emitted record, but many will not be able to do that
due to
- storing compresses record batches
- using buffered decoders where exact position information is not accessible
We therefore suggest to expose a mechanism that combines seekable file offsets
and records to read and skip after that offset. In the extreme cases, files can
work only with seekable positions or only with records-to-skip. Some sources,
like Avro, can have periodic seek points (sync markers) and count
records-to-skip after these markers.
*Efficient and Convenient Readers*
To balance efficiency (batch vectorized reading of ORC / Parquet for vectorized
query processing) and convenience (plug in 3-rd party CSV decoder over stream)
we offer three abstraction for record readers
- Bulk Formats that run over a file Path and return a iterable batch at a
time _(most efficient)_
- File Record formats which read files record-by-record. The source framework
hands over a pre-defined-size batch from Split Reader to Record Emitter.
- Stream Formats that decode an input stream and rely on the source framework
to decide how to batch record handover _(most convenient)_
--
This message was sent by Atlassian Jira
(v8.3.4#803005)