Metrics or runtimeContext in global commit
Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich
Re: Metrics or runtimeContext in global commit
Dear Yuxia, thank you for your answer! This is also our conclusion and my colleague has already proposed this feature. Best regards, Tobias - Ursprüngliche Mail - Von: "yuxia" An: "Dr. Tobias Fröhlich" CC: "User" , "dev" Gesendet: Montag, 20. Februar 2023 03:31:22 Betreff: Re: Metrics or runtimeContext in global commit It seems no other way to get the runtimeContext in a global commit. For me, I think it's reasoable to propose the fetature. I added flink-devs channel for more attention/discussion in flink devs. Best regards, Yuxia - 原始邮件 - 发件人: "Tobias Fröhlich" 收件人: "User" 发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34 主题: Metrics or runtimeContext in global commit Dear flink team, I would like to use metrics (which are then written to an influxdb) in the method org.apache.flink.api.connector.sink2.Committer::commit(Collection> committables) that I use for global commit. I use the helper method StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit topology. The problem is: When I implement the interface Committer, I cannot get the runtimeContext that I need for the metrics, because it is not an Operator. The only solution I found was by cloning the flink source code and amending it in the following way: 1. declaring an abstract class "CommitterWithRuntimeContext" that implements Committer and has: - an additional field for the runtimeContext - setter and getter for this field - an abstract method "void init()" 2. in the setup() method of GlobalCommitterOperator (which is an operator and thus has a runtimeContext) adding the following lines at the end: if (committer instanceof CommitterWithRuntimeContext) { ((CommitterWithRuntimeContext) committer).setRuntimeContext(getRuntimeContext()); ((CommitterWithRuntimeContext) committer).init(); } I can then implement the method CommitterWithRuntimeContext::init() in our code and call the method CommitterWithRuntimeContext::getRuntimeContext() when I need the runtimeContext. Is there another way to get the runtimeContext in a global commit? If not, is it justified to propose a feature request for a future release, where the global commit method can be implemented in a way that the user has access to the runtimeContext? Best regards and thanks in advance Tobias Fröhlich