回复: flink发布集群端运行,如何在sink或source中使用springboot的bean

2022-04-18 文章 wch...@163.com
如果需要在flink使用spring的话, 需要在open方法加载applicationContext对象. 
你这里需要在sink的open方法初始化spring上下文对象. 

  override def open(conf: Configuration): Unit = {
super.open(conf)
if (Option(SpringContextHolder.getApplicationContext).isEmpty) {
  SpringContextHolder.startupApplicationContext(SelectStockJob.getClass, 
profiles)
}
repository = SpringContextHolder.getBean(classOf[xxxRepository])
  }



wch...@163.com
 
发件人: 676360...@qq.com.INVALID
发送时间: 2022-04-18 14:28
收件人: user-zh@flink.apache.org
主题: flink发布集群端运行,如何在sink或source中使用springboot的bean
您好:
首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。

我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。
下面是我代码的具体实现思路:
1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用
@Component
@Slf4j
public class InitRunner implements CommandLineRunner {
 
@Autowired
private Constant constant;
 
@Override
public void run(String... args) throws Exception {
//初始化启动
log.error("初始化启动");
start();
}
 
private void start() throws Exception {
String actives = constant.getActive();
if (StrUtil.isNotBlank(actives)){
String[] active = actives.split(",");
for (String act : active) {
Class cls = 
Class.forName(constant.getProperty("streaming.tasks." + act + ".package"));
AbstractStreamingTask streamingTask = (AbstractStreamingTask) 
cls
.getConstructor(new 
Class[]{String.class,Constant.class})
.newInstance(new Object[]{act, constant});
System.err.println("1---");
//此方法为flink调用
streamingTask.streaming();
}
}
}
}

2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean
 
@Component
@Slf4j
public class SpringApplicationContext implements ApplicationContextAware, 
Serializable {
 
private static ApplicationContext applicationContext;
 
@Override
public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
log.error("context先执行");
if (this.applicationContext == null) {
synchronized (SpringApplicationContext.class){
if (this.applicationContext == null){
this.applicationContext = applicationContext;
}
}
}
}
 
 
public static Object getBean(String name) {
log.error("获取bean:"+name);
return applicationContext.getBean(name);
}
}
3.在自定义sink类中,通过ApplicationContext获取我需要的bean
 
@Slf4j
public class MysqlSink extends RichSinkFunction implements 
BaseSink, Serializable {
 
private BaseService baseService;
 
private Constant constant;
 
private String active;
 
public MysqlSink(String active) {
this.active = active;
}
 
@Override
public void open(Configuration parameters) throws Exception {
log.info("--open mysqlSink");
super.open(parameters);
init();
}
 
@Override
public void invoke(Test value, Context context) throws Exception {
log.info("--invoke mysqlSink");
if (value != null){
baseService.operate(value);
}
}
 
@Override
public SinkFunction[] getSinkFunction() {
return new SinkFunction[]{this};
}
 
private void init(){
if (constant == null){
constant = (Constant) SpringApplicationContext.getBean("constant");
}
if (baseService == null){
baseService = (BaseService) 
SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service"));
}
}
}

经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。
 
 
676360...@qq.com


flink发布集群端运行,如何在sink或source中使用springboot的bean

2022-04-17 文章 676360...@qq.com.INVALID
您好:
首先很感谢您能在百忙之中看到我的邮件。在使用flink框架过程中我遇到了一些问题,希望能得到您的解答。

我通过网上已有的资料进行学习,在本地环境将springboot框架与flink进行结合,并可以成功运行。但是当我将项目通过maven打包成jar包后,发布到flink集群端时,在自定义sink和source类中无法获取到springboot的ApplicationContext,所以我想问下针对此情况是否有解决方案。
下面是我代码的具体实现思路:
1.通过实现Springboot的CommandLineRunner的run方法来起到等同于main方法的作用
@Component
@Slf4j
public class InitRunner implements CommandLineRunner {

@Autowired
private Constant constant;

@Override
public void run(String... args) throws Exception {
//初始化启动
log.error("初始化启动");
start();
}

private void start() throws Exception {
String actives = constant.getActive();
if (StrUtil.isNotBlank(actives)){
String[] active = actives.split(",");
for (String act : active) {
Class cls = 
Class.forName(constant.getProperty("streaming.tasks." + act + ".package"));
AbstractStreamingTask streamingTask = (AbstractStreamingTask) 
cls
.getConstructor(new 
Class[]{String.class,Constant.class})
.newInstance(new Object[]{act, constant});
System.err.println("1---");
//此方法为flink调用
streamingTask.streaming();
}
}
}
}

2.自定义类实现ApplicationContextAware接口,获得ApplicationContext的值,并用static修饰,准备在自定义sink和source类中通过该值获得相关的bean

@Component
@Slf4j
public class SpringApplicationContext implements ApplicationContextAware, 
Serializable {

private static ApplicationContext applicationContext;

@Override
public void setApplicationContext(ApplicationContext applicationContext) 
throws BeansException {
log.error("context先执行");
if (this.applicationContext == null) {
synchronized (SpringApplicationContext.class){
if (this.applicationContext == null){
this.applicationContext = applicationContext;
}
}
}
}


public static Object getBean(String name) {
log.error("获取bean:"+name);
return applicationContext.getBean(name);
}
}
3.在自定义sink类中,通过ApplicationContext获取我需要的bean

@Slf4j
public class MysqlSink extends RichSinkFunction implements 
BaseSink, Serializable {

private BaseService baseService;

private Constant constant;

private String active;

public MysqlSink(String active) {
this.active = active;
}

@Override
public void open(Configuration parameters) throws Exception {
log.info("--open mysqlSink");
super.open(parameters);
init();
}

@Override
public void invoke(Test value, Context context) throws Exception {
log.info("--invoke mysqlSink");
if (value != null){
baseService.operate(value);
}
}

@Override
public SinkFunction[] getSinkFunction() {
return new SinkFunction[]{this};
}

private void init(){
if (constant == null){
constant = (Constant) SpringApplicationContext.getBean("constant");
}
if (baseService == null){
baseService = (BaseService) 
SpringApplicationContext.getBean(constant.getProperty("streaming.tasks."+active+".sink.mysql.service"));
}
}
}

经过我的测试,在发布到flink集群端并启动jar包时,applicationContext是可以获得正常的值得,但是在sink类中,值变为null。针对此种情况希望能从您那里获得相应解决方案,十分感谢。


676360...@qq.com