Re: Metrics or runtimeContext in global commit
Dear Yuxia, thank you for your answer! This is also our conclusion and my colleague has already proposed this feature. Best regards, Tobias - Ursprüngliche Mail - Von: "yuxia" An: "Dr. Tobias Fröhlich" CC: "User" , "dev" Gesendet: Montag, 20. Februar 2023 03:31:22 Betreff: Re: Metrics or runtimeContext in global commit It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re: Watermark in global commit
Dear Jan, thank you for your answer! The logic that ensures consistency should already be implemented in the TwoPhaseCommitSink and the WithPostCommitTopology. So I would rather like to use these well-tested classes than implement my own logic for this. Best regards Tobias Hi, I'm not expert on Flink specifially, but your approach might be easier solve when broken down into two steps - create a "stable" input to downstream processing, this might include a specific watermark. In Flink, the "stability" of input for downstream processing is ensured by a checkpoint. You would therefore need to wait for a checkpoint, buffering intermediate data in a state (and produce a particular watermark as a data element, because watermarks in general need not be 'stable'). Once a checkpoint is completed, you would flush the buffer for downstream operators, one would create the parquet files, the other would do whatever action needs to be taken based on the watermark. The checkpoint ensures that the two tasks would be eventually consistent (if this is sufficient for your case). In Apache Beam, we call this operation a transform that '@RequiresStableInput' [1], the implementation in Flink is as I described above. Jan [1] https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html On 2/14/23 13:23, Tobias Fröhlich wrote: > Dear flink team, > > I am facing the following problem: I would need to write events to parquet > files using the FileSink. Subsequently, I want to do something else in a > global commit where I need the corresponding watermark. However, the > org.apache.flink.connector.file.sink.FileSink forces the type of the > committables to be org.apache.flink.connector.file.sink.FileSinkCommittable > which can not carry watermarks. > > Details: > > As far, as I understand the idea of a two-phase commit with a global > committer, the committables are used for passing information from the writer > to the global committer. This is done by implementing two methods in the > writer and the committer, respectively: > > 1. Collection > TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns > a collection of committables of some type CommT and > 2. void Committer::commit(Collection>) that uses this > collection. > > In general, the type CommT can be chosen arbitrarily. So, if the watermark is > needed in the global commit, it is possible to use a customized object that > contains a field for the watermark. However, if the class > org.apache.flink.connector.file.sink.FileSink is used, the type for the > committables is always > org.apache.flink.connector.file.sink.FileSinkCommittable which does not have > a field that can be used for the watermark. > > The only solution I found, was by forking the flink source code and > augmenting it in the following way: > >1. adding a field to FileSinkCommittable ("private long watermark;" with > getter and setter) >2. changing the FileSinkCommittableSerializer accordingly (this makes it > necessary to define a new version) >3. in fileWriter::prepareCommit() adding a loop over all committables to > set the watermark > > > Am I missing something? Is there an easier way to get the watermarks from the > writer to the global committer? If not, is it justified to propose a feature > request? > > Best regards and thanks in advance > Tobias Fröhlich >
Metrics or runtimeContext in global commit
Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Watermark in global commit
Dear flink team, I am facing the following problem: I would need to write events to parquet files using the FileSink. Subsequently, I want to do something else in a global commit where I need the corresponding watermark. However, the org.apache.flink.connector.file.sink.FileSink forces the type of the committables to be org.apache.flink.connector.file.sink.FileSinkCommittable which can not carry watermarks. Details: As far, as I understand the idea of a two-phase commit with a global committer, the committables are used for passing information from the writer to the global committer. This is done by implementing two methods in the writer and the committer, respectively: 1. Collection TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a collection of committables of some type CommT and 2. void Committer::commit(Collection>) that uses this collection. In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed in the global commit, it is possible to use a customized object that contains a field for the watermark. However, if the class org.apache.flink.connector.file.sink.FileSink is used, the type for the committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which does not have a field that can be used for the watermark. The only solution I found, was by forking the flink source code and augmenting it in the following way: 1. adding a field to FileSinkCommittable ("private long watermark;" with getter and setter) 2. changing the FileSinkCommittableSerializer accordingly (this makes it necessary to define a new version) 3. in fileWriter::prepareCommit() adding a loop over all committables to set the watermark Am I missing something? Is there an easier way to get the watermarks from the writer to the global committer? If not, is it justified to propose a feature request? Best regards and thanks in advance Tobias Fröhlich