Hi, In my past project I was able to use Spring as a DI provider for Flink Jobs. It actually saves me a lot of hassle while writing/composing jobs and process functions. I was able to use all Spring's Bean annotations along with properties files managed by Spring as it would be a "normal" spring app. The dependencies that I was injecting via Spring were not serialized/deserialized by Flink which actually was something that I wanted to achieved. In some cases it is very hard or maybe even impossible to make some 3rd party classes serializable.
Things to highlight here: 1. I did it only for StreamAPI i think it could work also for TableAPI though. 2.I was loading a Spring context from ProcessFunction::open method. I was able to customize via Job parameters which Spring configuration I want to load. After doing this, all fields annotated with @Autowired were injected. 3, I was using standard @Configuration classes Issues: 1. Since i was using operator::open method to load the context, the context will be loaded few times depends on the number of operators deployed on particular Task Manager. This however could be improved. 2. The important thing here was that all your classes have to be "deployed" on every Task Manager/Job Manager in order to load them through DI. We achieved this by using what is called "Job session" cluster. Where our custom Flink docker image was build in a way that it contains our job jar with all dependencies needed. Because of that, we were not be able to use things like AWS EMR or Kinesis. Cheers, Krzysztof Chmielewski wt., 9 lis 2021 o 06:46 Thomas Weise <t...@apache.org> napisaĆ(a): > Hi, > > I was looking into a problem that requires a configurable type > serializer for communication with a schema registry. The service > endpoint can change, so I would not want to make it part of the > serializer snapshot but rather resolve it at graph construction time > (similar to how a Kafka bootstrap URL or JDBC connection URL would not > be embedded into a checkpoint). > > TypeSerializer is instantiated via either TypeInformation or > TypeSerializerSnapshot. While TypeInformation provides access to > ExecutionConfig and therefore ability to access parameters from > GlobalJobParameters that could be provided through the entry point, > restoreSerializer requires the serializer to be constructed from the > snapshot state alone. > > Ideally there would be a dependency injection mechanism for user code. > Discussion in [1] indicated there isn't a direct solution. Has anyone > come across a similar use case and found a way to work around this > limitation? It might be possible to work with a configuration > singleton that initializes from a file in a well known location, but > that depends on the deployment environment and doesn't play nice with > testing. > > Thanks, > Thomas > > [1] https://lists.apache.org/thread/6qbr4b391dcfwxhcvdl066rpv86gpm5o >