?????? 
??????????????????????????????????bloomfilter????????????
 .keyBy(_._1).process(new KeyedProcessFunction[String,(String,String),String]() 
{
  var state:ValueState[BloomFilter[CharSequence]]= null
  override def open(parameters: Configuration): Unit = {
    val stateDesc = new ValueStateDescriptor("state",TypeInformation.of(new 
TypeHint[BloomFilter[CharSequence]](){}))
    state = getRuntimeContext.getState(stateDesc)
  }
  override def processElement(value: (String, String), ctx: 
KeyedProcessFunction[String, (String, String), String]#Context, out: 
Collector[String]) = {

    var filter = state.value
    if(filter==null){
      println("null filter")
      filter=  
BloomFilter.create[CharSequence](Funnels.unencodedCharsFunnel,10000,0.0001)}
    //val contains = filter.mightContain(value._2)
    if(!filter.mightContain(value._2)) {
      filter.put(value._2)
      state.update(filter)
      out.collect(value._2)

    }

  }
})
??????????????????????savepoint??????????????state??????bloomfilter????null??????????????

Reply via email to