您好:
    首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。
    
我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。
    下面是我代码的具体实现思路:
    1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用
@Component
@Slf4j
public class InitRunner implements CommandLineRunner {

    @Autowired
    private Constant constant;

    @Override
    public void run(String... args) throws Exception {
        //初始化启动
        log.error("初始化启动");
        start();
    }

    private void start() throws Exception {
        String actives = constant.getActive();
        if (StrUtil.isNotBlank(actives)){
            String[] active = actives.split(",");
            for (String act : active) {
                Class cls = 
Class.forName(constant.getProperty("streaming.tasks." + act + ".package"));
                AbstractStreamingTask streamingTask = (AbstractStreamingTask) 
cls
                        .getConstructor(new 
Class[]{String.class,Constant.class})
                        .newInstance(new Object[]{act, constant});
                System.err.println("1---");
                //此方法为flink调用
                streamingTask.streaming();
            }
        }
    }
}
    
2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean

@Component
@Slf4j
public class SpringApplicationContext implements ApplicationContextAware, 
Serializable {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
        log.error("context先执行");
        if (this.applicationContext == null) {
            synchronized (SpringApplicationContext.class){
                if (this.applicationContext == null){
                    this.applicationContext = applicationContext;
                }
            }
        }
    }


    public static Object getBean(String name) {
        log.error("获取bean:"+name);
        return applicationContext.getBean(name);
    }
}
    3.在自定义sink类中,通过ApplicationContext获取我需要的bean

@Slf4j
public class MysqlSink extends RichSinkFunction<Test> implements 
BaseSink<Test>, Serializable {

    private BaseService baseService;

    private Constant constant;

    private String active;

    public MysqlSink(String active) {
        this.active = active;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        log.info("------open mysqlSink");
        super.open(parameters);
        init();
    }

    @Override
    public void invoke(Test value, Context context) throws Exception {
        log.info("------invoke mysqlSink");
        if (value != null){
            baseService.operate(value);
        }
    }

    @Override
    public SinkFunction<Test>[] getSinkFunction() {
        return new SinkFunction[]{this};
    }

    private void init(){
        if (constant == null){
            constant = (Constant) SpringApplicationContext.getBean("constant");
        }
        if (baseService == null){
            baseService = (BaseService) 
SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service"));
        }
    }
}
    
经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。


676360...@qq.com

回复