Hi! The last expression in your try block is
if(validationMessages.isEmpty) { (parsedJson.toString(), validationMessages.foreach((msg=>msg.getMessage.toString))) } else { (parsedJson.toString(), "Format is correct...") } The first one produces a (String, Unit) type while the second one produces a (String, String) type, so the whole if expression produces (String, Any) type. However your parseJson should return Either[String, String], thus causing this issue. Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月5日周三 19:04写道: > I have written a process function where I am parsing the JSON and if it is > not according to the expected format it passes as Failure to the process > function and I print the records which are working fine. Now, I was trying > to print the message and the record in case of Success and Failure. I > implemented the below code and it gave me the error. What exactly I am > missing? > > package KafkaAsSource > > import com.fasterxml.jackson.databind.ObjectMapper > import com.networknt.schema.{JsonSchemaFactory, SpecVersion} > import org.apache.flink.api.scala.createTypeInformation > import org.apache.flink.streaming.api.functions.ProcessFunction > import org.apache.flink.streaming.api.scala.OutputTag > import org.apache.flink.util.Collector > import scala.jdk.CollectionConverters._ > import scala.util.{Failure, Success, Try} > > class JSONParsingProcessFunction extends ProcessFunction[String,String] { > val outputTag = new OutputTag[String]("failed") > > def parseJson(json: String): Either[String, String] = { > val schemaJsonString = > """ > { > "$schema": "http://json-schema.org/draft-04/schema#", > "title": "Product", > "description": "A product from the catalog", > "type": "object", > "properties": { > "id": { > "description": "The unique identifier for a product", > "type": "integer" > }, > "premium": { > "description": "Annual Premium", > "type": "integer" > }, > "eventTime": { > "description": "Timestamp at which record has arrived at source / > generated", > "type": "string" > } > }, > "required": ["id", "premium","eventTime"] > } > """ > Try { > val schema = > JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString) > // You can read a JSON object from String, a file, URL, etc. > val parsedJson = new ObjectMapper().readTree(json) > val validationMessages = schema.validate(parsedJson).asScala > //validationMessages.foreach(msg => println(msg.getMessage)) > require(validationMessages.isEmpty) > //parsedJson.toString() > if(validationMessages.isEmpty) > { > > (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString))) > } > else > { > (parsedJson.toString(),"Format is correct...") > } > > } > match { > case Success(x) => { > println("Good: " + x) > Right(x) > } > case Failure(err) => { > println("Bad: " + json) > Left(json) > } > } > } > override def processElement(i: String, context: ProcessFunction[String, > String]#Context, collector: Collector[String]): Unit = { > parseJson(i) match { > case Right(data) => { > collector.collect(data) > println("Good Records: " + data) > } > case Left(json) => { > context.output(outputTag, json) > println("Bad Records: " + json) > } > } > } > } > > > Error: > > type mismatch; > found : (String, Any) > required: String > Right(x) > >