如果需要在flink使用spring的话, 需要在open方法加载applicationContext对象. 你这里需要在sink的open方法初始化spring上下文对象.
override def open(conf: Configuration): Unit = { super.open(conf) if (Option(SpringContextHolder.getApplicationContext).isEmpty) { SpringContextHolder.startupApplicationContext(SelectStockJob.getClass, profiles) } repository = SpringContextHolder.getBean(classOf[xxxRepository]) } wch...@163.com 发件人: 676360...@qq.com.INVALID 发送时间: 2022-04-18 14:28 收件人: user-zh@flink.apache.org 主题: flink发布集群端运行,如何在sink或source中使用springboot的bean 您好: 首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。 我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。 下面是我代码的具体实现思路: 1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用 @Component @Slf4j public class InitRunner implements CommandLineRunner { @Autowired private Constant constant; @Override public void run(String... args) throws Exception { //初始化启动 log.error("初始化启动"); start(); } private void start() throws Exception { String actives = constant.getActive(); if (StrUtil.isNotBlank(actives)){ String[] active = actives.split(","); for (String act : active) { Class cls = Class.forName(constant.getProperty("streaming.tasks." + act + ".package")); AbstractStreamingTask streamingTask = (AbstractStreamingTask) cls .getConstructor(new Class[]{String.class,Constant.class}) .newInstance(new Object[]{act, constant}); System.err.println("1---"); //此方法为flink调用 streamingTask.streaming(); } } } } 2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean @Component @Slf4j public class SpringApplicationContext implements ApplicationContextAware, Serializable { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { log.error("context先执行"); if (this.applicationContext == null) { synchronized (SpringApplicationContext.class){ if (this.applicationContext == null){ this.applicationContext = applicationContext; } } } } public static Object getBean(String name) { log.error("获取bean:"+name); return applicationContext.getBean(name); } } 3.在自定义sink类中,通过ApplicationContext获取我需要的bean @Slf4j public class MysqlSink extends RichSinkFunction<Test> implements BaseSink<Test>, Serializable { private BaseService baseService; private Constant constant; private String active; public MysqlSink(String active) { this.active = active; } @Override public void open(Configuration parameters) throws Exception { log.info("------open mysqlSink"); super.open(parameters); init(); } @Override public void invoke(Test value, Context context) throws Exception { log.info("------invoke mysqlSink"); if (value != null){ baseService.operate(value); } } @Override public SinkFunction<Test>[] getSinkFunction() { return new SinkFunction[]{this}; } private void init(){ if (constant == null){ constant = (Constant) SpringApplicationContext.getBean("constant"); } if (baseService == null){ baseService = (BaseService) SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service")); } } } 经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。 676360...@qq.com