你好, 主要代码如下:
class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _ override def open(parameters: Configuration): Unit = { carEfenceState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String], classOf[Boolean])) } override def processBroadcastElement(in2: List[Rule], context: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("rules", in2) } override def processElement(kafkaSource: KafkaStreamSource, readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource]#ReadOnlyContext, collector: Collector[KafkaStreamSource]): Unit = { val ruleIterator = readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator() while (ruleIterator.hasNext) { val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next() val ruleList: List[Rule] = ruleMap.getValue for (rule <- ruleList) { val mapKey = kafkaSource.vno + rule.id val tempState = carEfenceState.get(mapKey) val currentState = if (tempState != null) tempState else false // 业务逻辑 if (!currentState) { ... carEfenceState.put(mapKey, true) ... } else if (currentState) { ... carEfenceState.remove(mapKey) ... } } } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/