Hi!

The last expression in your try block is

if(validationMessages.isEmpty) {
  (parsedJson.toString(),
validationMessages.foreach((msg=>msg.getMessage.toString)))
} else {
  (parsedJson.toString(), "Format is correct...")
}

The first one produces a (String, Unit) type while the second one produces
a (String, String) type, so the whole if expression produces (String, Any)
type. However your parseJson should return Either[String, String], thus
causing this issue.


Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月5日周三 19:04写道:

> I have written a process function where I am parsing the JSON and if it is
> not according to the expected format it passes as Failure to the process
> function and I print the records which are working fine. Now, I was trying
> to print the message and the record in case of Success and Failure. I
> implemented the below code and it gave me the error. What exactly I am
> missing?
>
> package KafkaAsSource
>
> import com.fasterxml.jackson.databind.ObjectMapper
> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.streaming.api.functions.ProcessFunction
> import org.apache.flink.streaming.api.scala.OutputTag
> import org.apache.flink.util.Collector
> import scala.jdk.CollectionConverters._
> import scala.util.{Failure, Success, Try}
>
> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>   val outputTag = new OutputTag[String]("failed")
>
>   def parseJson(json: String): Either[String, String] = {
>     val schemaJsonString =
>       """
> {
>     "$schema": "http://json-schema.org/draft-04/schema#";,
>     "title": "Product",
>     "description": "A product from the catalog",
>     "type": "object",
>     "properties": {
>         "id": {
>             "description": "The unique identifier for a product",
>             "type": "integer"
>         },
>         "premium": {
>             "description": "Annual Premium",
>             "type": "integer"
>         },
>         "eventTime": {
>             "description": "Timestamp at which record has arrived at source / 
> generated",
>             "type": "string"
>         }
>     },
>     "required": ["id", "premium","eventTime"]
> }
> """
>     Try {
>       val schema = 
> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>       // You can read a JSON object from String, a file, URL, etc.
>       val parsedJson = new ObjectMapper().readTree(json)
>       val validationMessages = schema.validate(parsedJson).asScala
>       //validationMessages.foreach(msg => println(msg.getMessage))
>       require(validationMessages.isEmpty)
>       //parsedJson.toString()
>       if(validationMessages.isEmpty)
>         {
>           
> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>         }
>       else
>         {
>           (parsedJson.toString(),"Format is correct...")
>         }
>
>     }
>     match {
>       case Success(x) => {
>         println("Good: " + x)
>         Right(x)
>       }
>       case Failure(err) => {
>         println("Bad:  " + json)
>         Left(json)
>       }
>     }
>   }
>   override def processElement(i: String, context: ProcessFunction[String, 
> String]#Context, collector: Collector[String]): Unit = {
>     parseJson(i) match {
>       case Right(data) => {
>         collector.collect(data)
>         println("Good Records: " + data)
>       }
>       case Left(json) => {
>         context.output(outputTag, json)
>         println("Bad Records: " + json)
>       }
>     }
>   }
> }
>
>
> Error:
>
> type mismatch;
>  found   : (String, Any)
>  required: String
>         Right(x)
>
>

Reply via email to