Hi, christopher, I think there is already about the ExecutionConfig for new Sink API in the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for Source API and Sink API. Maybe we could continue to discuss this topic under FLIP-287.
Best, Hang [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853 Yufan Sheng <syh...@gmail.com> 于2023年4月3日周一 14:06写道: > I agree with you. It's quite useful to access the ExecutionConfig in > Source API. When I develop the flink-connector-pulsar. The only > configuration that I can't access is the checkpoint configure which is > defined in ExecutionConfig. I can switch the behavior automatically by > the checkpoint switch. So I have to add more custom configurations for > the Pulsar Source. > > On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <chrisssle...@gmail.com> > wrote: > > > > Hello, > > > > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and > FLIP-143 APIs. The scaffolding is more complicated than the old > SourceFunction and SinkFunction, but not terrible. However I can't figure > out how to access the ExecutionConfig under these new APIs. This was > possible in the old APIs by way of the RuntimeContext of the > AbstractRichFunction (which are extended by RichSourceFunction and > RichSinkFunction). > > > > The reason I would like this is: some interactions with external > systems may be invalid under certain Flink job execution parameters. > Consider a system like NATS which allows for acknowledgements of messages > received. I would ideally acknowledge all received messages by the source > connector during checkpointing. If I fail to acknowledge the delivered > messages, after a pre-configured amount of time, NATS would resend the > message (which is good in my case for fault tolerance). > > > > However, if a Flink job using these connectors has disabled > checkpointing or made the interval too large, the connector will never > acknowledge delivered messages and the NATS system may send the message > again and cause duplicate data. I would be able to avoid this if I could > access the ExecutionConfig to check these parameters and throw early. > > > > I know that the SourceReaderContext gives me access to the > Configuration, but that doesn't handle the case where the > execution-environment is set programatically in a job definition rather > than through configuration. Any ideas? > > > > Thanks, > > Chris >