?????? ??????????????????????????????????bloomfilter???????????? .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() { var state:ValueState[BloomFilter[CharSequence]]= null override def open(parameters: Configuration): Unit = { val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new TypeHint[BloomFilter[CharSequence]](){})) state = getRuntimeContext.getState(stateDesc) } override def processElement(value: (String, String), ctx: KeyedProcessFunction[String, (String, String), String]#Context, out: Collector[String]) = {
var filter = state.value if(filter==null){ println("null filter") filter= BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,10000,0.0001)} //val contains = filter.mightContain(value._2) if(!filter.mightContain(value._2)) { filter.put(value._2) state.update(filter) out.collect(value._2) } } }) ??????????????????????savepoint??????????????state??????bloomfilter????null??????????????