Re: Access ExecutionConfig from new Source and Sink API

2023-04-03 Thread Hang Ruan
Hi, christopher,

I think there is already about the ExecutionConfig for new Sink API in
the FLIP-287[1]. What we actually need is a read-only ExecutionConfig for
Source API and Sink API.
Maybe we could continue to discuss this topic under FLIP-287.

Best,
Hang

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853

Yufan Sheng  于2023年4月3日周一 14:06写道:

> I agree with you. It's quite useful to access the ExecutionConfig in
> Source API. When I develop the flink-connector-pulsar. The only
> configuration that I can't access is the checkpoint configure which is
> defined in ExecutionConfig. I can switch the behavior automatically by
> the checkpoint switch. So I have to add more custom configurations for
> the Pulsar Source.
>
> On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee 
> wrote:
> >
> > Hello,
> >
> > I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
> FLIP-143 APIs. The scaffolding is more complicated than the old
> SourceFunction and SinkFunction, but not terrible. However I can't figure
> out how to access the ExecutionConfig under these new APIs. This was
> possible in the old APIs by way of the RuntimeContext of the
> AbstractRichFunction (which are extended by RichSourceFunction and
> RichSinkFunction).
> >
> > The reason I would like this is:  some interactions with external
> systems may be invalid under certain Flink job execution parameters.
> Consider a system like NATS which allows for acknowledgements of messages
> received. I would ideally acknowledge all received messages by the source
> connector during checkpointing. If I fail to acknowledge the delivered
> messages, after a pre-configured amount of time, NATS would resend the
> message (which is good in my case for fault tolerance).
> >
> > However, if a Flink job using these connectors has disabled
> checkpointing or made the interval too large, the connector will never
> acknowledge delivered messages and the NATS system may send the message
> again and cause duplicate data. I would be able to avoid this if I could
> access the ExecutionConfig to check these parameters and throw early.
> >
> > I know that the SourceReaderContext gives me access to the
> Configuration, but that doesn't handle the case where the
> execution-environment is set programatically in a job definition rather
> than through configuration. Any ideas?
> >
> > Thanks,
> > Chris
>


Re: Access ExecutionConfig from new Source and Sink API

2023-04-03 Thread Yufan Sheng
I agree with you. It's quite useful to access the ExecutionConfig in
Source API. When I develop the flink-connector-pulsar. The only
configuration that I can't access is the checkpoint configure which is
defined in ExecutionConfig. I can switch the behavior automatically by
the checkpoint switch. So I have to add more custom configurations for
the Pulsar Source.

On Mon, Apr 3, 2023 at 1:47 PM Christopher Lee  wrote:
>
> Hello,
>
> I'm trying to develop Flink connectors to NATS using the new FLIP-27 and 
> FLIP-143 APIs. The scaffolding is more complicated than the old 
> SourceFunction and SinkFunction, but not terrible. However I can't figure out 
> how to access the ExecutionConfig under these new APIs. This was possible in 
> the old APIs by way of the RuntimeContext of the AbstractRichFunction (which 
> are extended by RichSourceFunction and RichSinkFunction).
>
> The reason I would like this is:  some interactions with external systems may 
> be invalid under certain Flink job execution parameters. Consider a system 
> like NATS which allows for acknowledgements of messages received. I would 
> ideally acknowledge all received messages by the source connector during 
> checkpointing. If I fail to acknowledge the delivered messages, after a 
> pre-configured amount of time, NATS would resend the message (which is good 
> in my case for fault tolerance).
>
> However, if a Flink job using these connectors has disabled checkpointing or 
> made the interval too large, the connector will never acknowledge delivered 
> messages and the NATS system may send the message again and cause duplicate 
> data. I would be able to avoid this if I could access the ExecutionConfig to 
> check these parameters and throw early.
>
> I know that the SourceReaderContext gives me access to the Configuration, but 
> that doesn't handle the case where the execution-environment is set 
> programatically in a job definition rather than through configuration. Any 
> ideas?
>
> Thanks,
> Chris


Access ExecutionConfig from new Source and Sink API

2023-04-02 Thread Christopher Lee
Hello,

I'm trying to develop Flink connectors to NATS using the new FLIP-27 and
FLIP-143 APIs. The scaffolding is more complicated than the old
SourceFunction and SinkFunction, but not terrible. However I can't figure
out how to access the ExecutionConfig under these new APIs. This was
possible in the old APIs by way of the RuntimeContext of the
AbstractRichFunction (which are extended by RichSourceFunction and
RichSinkFunction).

The reason I would like this is:  some interactions with external systems
may be invalid under certain Flink job execution parameters. Consider a
system like NATS which allows for acknowledgements of messages received. I
would ideally acknowledge all received messages by the source connector
during checkpointing. If I fail to acknowledge the delivered messages,
after a pre-configured amount of time, NATS would resend the message (which
is good in my case for fault tolerance).

However, if a Flink job using these connectors has disabled checkpointing
or made the interval too large, the connector will never acknowledge
delivered messages and the NATS system may send the message again and cause
duplicate data. I would be able to avoid this if I could access the
ExecutionConfig to check these parameters and throw early.

I know that the SourceReaderContext gives me access to the Configuration,
but that doesn't handle the case where the execution-environment is set
programatically in a job definition rather than through configuration. Any
ideas?

Thanks,
Chris