Hello,

I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
FLIP-143 APIs. The scaffolding is more complicated than the old
SourceFunction and SinkFunction, but not terrible. However I can't figure
out how to access the ExecutionConfig under these new APIs. This was
possible in the old APIs by way of the RuntimeContext of the
AbstractRichFunction (which are extended by RichSourceFunction and
RichSinkFunction).

The reason I would like this is:  some interactions with external systems
may be invalid under certain Flink job execution parameters. Consider a
system like NATS which allows for acknowledgements of messages received. I
would ideally acknowledge all received messages by the source connector
during checkpointing. If I fail to acknowledge the delivered messages,
after a pre-configured amount of time, NATS would resend the message (which
is good in my case for fault tolerance).

However, if a Flink job using these connectors has disabled checkpointing
or made the interval too large, the connector will never acknowledge
delivered messages and the NATS system may send the message again and cause
duplicate data. I would be able to avoid this if I could access the
ExecutionConfig to check these parameters and throw early.

I know that the SourceReaderContext gives me access to the Configuration,
but that doesn't handle the case where the execution-environment is set
programatically in a job definition rather than through configuration. Any
ideas?

Thanks,
Chris

Reply via email to