Dear flink team,

I am facing the following problem: I would need to write events to parquet 
files using the FileSink. Subsequently, I want to do something else in a global 
commit where I need the corresponding watermark. However, the 
org.apache.flink.connector.file.sink.FileSink forces the type of the 
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable 
which can not carry watermarks.

Details:

As far, as I understand the idea of a two-phase commit with a global committer, 
the committables are used for passing information from the writer to the global 
committer. This is done by implementing two methods in the writer and the 
committer, respectively:

 1. Collection<CommT> 
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a 
collection of committables of some type CommT and 
 2. void Committer::commit(Collection<CommitRequest<CommT>>) that uses this 
collection.

In general, the type CommT can be chosen arbitrarily. So, if the watermark is 
needed in the global commit, it is possible to use a customized object that 
contains a field for the watermark. However, if the class 
org.apache.flink.connector.file.sink.FileSink<IN> is used, the type for the 
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable 
which does not have a field that can be used for the watermark.

The only solution I found, was by forking the flink source code and augmenting 
it in the following way:

  1. adding a field to FileSinkCommittable ("private long watermark;" with 
getter and setter)
  2. changing the FileSinkCommittableSerializer accordingly (this makes it 
necessary to define a new version)
  3. in fileWriter::prepareCommit() adding a loop over all committables to set 
the watermark


Am I missing something? Is there an easier way to get the watermarks from the 
writer to the global committer? If not, is it justified to propose a feature 
request?

Best regards and thanks in advance
Tobias Fröhlich

Reply via email to