Re: Access ExecutionConfig from new Source and Sink API
Hi, christopher, I think there is already about the ExecutionConfig for new Sink API in the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for Source API and Sink API. Maybe we could continue to discuss this topic under FLIP-287. Best, Hang [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 Yufan Sheng 于2023年4月3日周一 14:06写道: > I agree with you. It's quite useful to access the ExecutionConfig in > Source API. When I develop the flink-connector-pulsar. The only > configuration that I can't access is the checkpoint configure which is > defined in ExecutionConfig. I can switch the behavior automatically by > the checkpoint switch. So I have to add more custom configurations for > the Pulsar Source. > > On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee > wrote: > > > > Hello, > > > > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and > FLIP-143 APIs. The scaffolding is more complicated than the old > SourceFunction and SinkFunction, but not terrible. However I can't figure > out how to access the ExecutionConfig under these new APIs. This was > possible in the old APIs by way of the RuntimeContext of the > AbstractRichFunction (which are extended by RichSourceFunction and > RichSinkFunction). > > > > The reason I would like this is: some interactions with external > systems may be invalid under certain Flink job execution parameters. > Consider a system like NATS which allows for acknowledgements of messages > received. I would ideally acknowledge all received messages by the source > connector during checkpointing. If I fail to acknowledge the delivered > messages, after a pre-configured amount of time, NATS would resend the > message (which is good in my case for fault tolerance). > > > > However, if a Flink job using these connectors has disabled > checkpointing or made the interval too large, the connector will never > acknowledge delivered messages and the NATS system may send the message > again and cause duplicate data. I would be able to avoid this if I could > access the ExecutionConfig to check these parameters and throw early. > > > > I know that the SourceReaderContext gives me access to the > Configuration, but that doesn't handle the case where the > execution-environment is set programatically in a job definition rather > than through configuration. Any ideas? > > > > Thanks, > > Chris >
Re: Access ExecutionConfig from new Source and Sink API
I agree with you. It's quite useful to access the ExecutionConfig in Source API. When I develop the flink-connector-pulsar. The only configuration that I can't access is the checkpoint configure which is defined in ExecutionConfig. I can switch the behavior automatically by the checkpoint switch. So I have to add more custom configurations for the Pulsar Source. On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee wrote: > > Hello, > > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and > FLIP-143 APIs. The scaffolding is more complicated than the old > SourceFunction and SinkFunction, but not terrible. However I can't figure out > how to access the ExecutionConfig under these new APIs. This was possible in > the old APIs by way of the RuntimeContext of the AbstractRichFunction (which > are extended by RichSourceFunction and RichSinkFunction). > > The reason I would like this is: some interactions with external systems may > be invalid under certain Flink job execution parameters. Consider a system > like NATS which allows for acknowledgements of messages received. I would > ideally acknowledge all received messages by the source connector during > checkpointing. If I fail to acknowledge the delivered messages, after a > pre-configured amount of time, NATS would resend the message (which is good > in my case for fault tolerance). > > However, if a Flink job using these connectors has disabled checkpointing or > made the interval too large, the connector will never acknowledge delivered > messages and the NATS system may send the message again and cause duplicate > data. I would be able to avoid this if I could access the ExecutionConfig to > check these parameters and throw early. > > I know that the SourceReaderContext gives me access to the Configuration, but > that doesn't handle the case where the execution-environment is set > programatically in a job definition rather than through configuration. Any > ideas? > > Thanks, > Chris
Access ExecutionConfig from new Source and Sink API
Hello, I'm trying to develop Flink connectors to NATS using the new FLIP-27 and FLIP-143 APIs. The scaffolding is more complicated than the old SourceFunction and SinkFunction, but not terrible. However I can't figure out how to access the ExecutionConfig under these new APIs. This was possible in the old APIs by way of the RuntimeContext of the AbstractRichFunction (which are extended by RichSourceFunction and RichSinkFunction). The reason I would like this is: some interactions with external systems may be invalid under certain Flink job execution parameters. Consider a system like NATS which allows for acknowledgements of messages received. I would ideally acknowledge all received messages by the source connector during checkpointing. If I fail to acknowledge the delivered messages, after a pre-configured amount of time, NATS would resend the message (which is good in my case for fault tolerance). However, if a Flink job using these connectors has disabled checkpointing or made the interval too large, the connector will never acknowledge delivered messages and the NATS system may send the message again and cause duplicate data. I would be able to avoid this if I could access the ExecutionConfig to check these parameters and throw early. I know that the SourceReaderContext gives me access to the Configuration, but that doesn't handle the case where the execution-environment is set programatically in a job definition rather than through configuration. Any ideas? Thanks, Chris