感谢回复,问题已解决。
解决方式:
参照官网的一个例子将状态的获取放在 processElement 内部。
private val eFenceMapStateDesc = new MapStateDescriptor[String,
Boolean]("carEfenceState", classOf[String], classOf[Boolean])
private val DbIdMapStateDesc = new MapStateDescriptor[String,
Long]("eFenceCarDbIdState", classOf[String], clas
你虽然调整了实现方法的顺序,但是这个程序执行顺序还是先执行processElement(),后执行processBroadcastElement()
发件人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题
主要代码如下:
class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource
人: chaos
发送时间: 2021年3月10日 14:29
收件人: user-zh@flink.apache.org
主题: Re: 回复:MapState 无法更新问题
主要代码如下:
class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {
private var carEfenceState: MapState[String, Boolean
你好,
主要代码如下:
class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {
private var carEfenceState: MapState[String, Boolean] = _
override def open(parameters: Configuration): Unit = {
carEfenceState = g
可以贴个完整的代码吗
-- 原始邮件 --
发件人: chaos http://apache-flink.147419.n8.nabble.com/