Hi guys,
I want to test a function like :

private[flink] def filterStream(dataStream:
DataStream[GenericRecord]): DataStream[GenericRecord] = {
  dataStream.filter(new FilterFunction[GenericRecord] {
    override def filter(value: GenericRecord): Boolean = {
      if (value == null || value.get(StipFields.requestMessageType) == null) {
        return false;
      } else {
        
ExecutionEnv.messageTypeList.contains(value.get(StipFields.requestMessageType)
          .toString) &&
ExecutionEnv.pcrList.contains(value.get(StipFields.pcrList).toString)
&& (value.get(StipFields
          .rejectCode).asInstanceOf[Int] == 0) &&
!(value.get(StipFields.processingCode).toString.equals("33"))
      }
    }
  })
}

How can I do this ?

Best,
Vishwas

Reply via email to