你好,

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String,
KafkaStreamSource, List[Rule], KafkaStreamSource] {

        private var carEfenceState: MapState[String, Boolean] = _
        
        override def open(parameters: Configuration): Unit = {
                carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
        }
        
        override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
                context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
        }
        
        override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {
                
                val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
                while (ruleIterator.hasNext) {
                        val ruleMap: Map.Entry[String, List[Rule]] = 
ruleIterator.next()
                        val ruleList: List[Rule] = ruleMap.getValue
                        
                        
                        for (rule <- ruleList) {
                        
                                val mapKey = kafkaSource.vno + rule.id
                                val tempState = carEfenceState.get(mapKey)
                                val currentState = if (tempState != null) 
tempState else false
                                // 业务逻辑
                                if (!currentState) {
                                        ...
                                        carEfenceState.put(mapKey, true)
                                        ...
                                } else if (currentState) {
                                        ...
                                        carEfenceState.remove(mapKey)
                                        ...
                                }
                        }
                }
        }
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复