Hi,
In my past project I was able to use Spring as a DI provider for Flink
Jobs. It actually saves me a lot of hassle while writing/composing jobs and
process functions.
I was able to use all Spring's Bean annotations along with properties files
managed by Spring as it would be a "normal" spring app. The dependencies
that I was injecting via Spring were not serialized/deserialized by Flink
which actually was something that I wanted to achieved. In some cases it is
very hard or maybe even impossible to make some 3rd party classes
serializable.

Things to highlight here:
1. I did it only for StreamAPI i think it could work also for TableAPI
though.
2.I was loading a Spring context from ProcessFunction::open method.
I was able to customize via Job parameters which Spring configuration I
want to load.
After doing this, all fields annotated with @Autowired were injected.
3, I was using standard @Configuration classes

Issues:
1. Since i was using operator::open method to load the context, the context
will be loaded few times depends on the number of operators deployed on
particular Task Manager. This however could be improved.
2. The important thing here was that all your classes have to be "deployed"
on every Task Manager/Job Manager in order to load them through DI.
We achieved this by using what is called "Job session" cluster. Where our
custom Flink docker image was build in a way that it contains our job jar
with all dependencies needed.

Because of that, we were not be able to use things like AWS EMR or Kinesis.

Cheers,
Krzysztof Chmielewski

wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaƂ(a):

> Hi,
>
> I was looking into a problem that requires a configurable type
> serializer for communication with a schema registry. The service
> endpoint can change, so I would not want to make it part of the
> serializer snapshot but rather resolve it at graph construction time
> (similar to how a Kafka bootstrap URL or JDBC connection URL would not
> be embedded into a checkpoint).
>
> TypeSerializer is instantiated via either TypeInformation or
> TypeSerializerSnapshot. While TypeInformation provides access to
> ExecutionConfig and therefore ability to access parameters from
> GlobalJobParameters that could be provided through the entry point,
> restoreSerializer requires the serializer to be constructed from the
> snapshot state alone.
>
> Ideally there would be a dependency injection mechanism for user code.
> Discussion in [1] indicated there isn't a direct solution. Has anyone
> come across a similar use case and found a way to work around this
> limitation? It might be possible to work with a configuration
> singleton that initializes from a file in a well known location, but
> that depends on the deployment environment and doesn't play nice with
> testing.
>
> Thanks,
> Thomas
>
> [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o
>

Reply via email to