Hi, christopher,

I think there is already about the ExecutionConfig for new Sink API in
the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for
Source API and Sink API.
Maybe we could continue to discuss this topic under FLIP-287.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853

Yufan Sheng <syh...@gmail.com> 于2023年4月3日周一 14:06写道:

> I agree with you. It's quite useful to access the ExecutionConfig in
> Source API. When I develop the flink-connector-pulsar. The only
> configuration that I can't access is the checkpoint configure which is
> defined in ExecutionConfig. I can switch the behavior automatically by
> the checkpoint switch. So I have to add more custom configurations for
> the Pulsar Source.
>
> On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee <chrisssle...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
> FLIP-143 APIs. The scaffolding is more complicated than the old
> SourceFunction and SinkFunction, but not terrible. However I can't figure
> out how to access the ExecutionConfig under these new APIs. This was
> possible in the old APIs by way of the RuntimeContext of the
> AbstractRichFunction (which are extended by RichSourceFunction and
> RichSinkFunction).
> >
> > The reason I would like this is:  some interactions with external
> systems may be invalid under certain Flink job execution parameters.
> Consider a system like NATS which allows for acknowledgements of messages
> received. I would ideally acknowledge all received messages by the source
> connector during checkpointing. If I fail to acknowledge the delivered
> messages, after a pre-configured amount of time, NATS would resend the
> message (which is good in my case for fault tolerance).
> >
> > However, if a Flink job using these connectors has disabled
> checkpointing or made the interval too large, the connector will never
> acknowledge delivered messages and the NATS system may send the message
> again and cause duplicate data. I would be able to avoid this if I could
> access the ExecutionConfig to check these parameters and throw early.
> >
> > I know that the SourceReaderContext gives me access to the
> Configuration, but that doesn't handle the case where the
> execution-environment is set programatically in a job definition rather
> than through configuration. Any ideas?
> >
> > Thanks,
> > Chris
>

Reply via email to