How can I use it with a scala case class? If I understand it correctly for better performance the Object Mapper is already initialized in each KafkaConsumer and returning ObjectNodes. So probably I should rephrase to: how can I then map these to case classes without handcoding it? https://github.com/json4s/json4s or https://github.com/FasterXML/jackson-module-scala both only seem to consume strings.
Best, Georg Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < taher...@gmail.com>: > You can try the Jackson ObjectMapper library and that will get you from > json to object. > > Regards, > Taher Koitawala > > On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com> > wrote: > >> Hi, >> >> I want to map a stream of JSON documents from Kafka to a scala >> case-class. How can this be accomplished using the >> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes >> required? >> >> I have a Spark background. There, such manual mappings usually are >> discouraged. Instead, they offer a nice API (dataset API) to perform such a >> type of assignment. >> 1) this is concise >> 2) it operates on sparks off-heap memory representations (tungsten) to be >> faster >> >> In Flink, instead, such off-heap optimizations seem not to be talked much >> about (sorry if I miss something, I am a Flink newbie). Is there a reason >> why these optimizations are not necessary in Flink? >> >> >> How could I get the following example: >> val serializer = new JSONKeyValueDeserializationSchema(false) >> val stream = senv.addSource( >> new FlinkKafkaConsumer( >> "tweets-raw-json", >> serializer, >> properties >> ).setStartFromEarliest() // TODO experiment with different start >> values >> ) >> >> to map to this Tweet class concisely, i.e. without manually iterating >> through all the attribute fields and parsing the keys from the object node >> tree. >> >> final case class Tweet(tweet_id: Option[String], text: Option[String], >> source: Option[String], geo: Option[String], place: Option[String], lang: >> Option[String], created_at: Option[String], timestamp_ms: Option[String], >> coordinates: Option[String], user_id: Option[Long], user_name: >> Option[String], screen_name: Option[String], user_created_at: >> Option[String], followers_count: Option[Long], friends_count: Option[Long], >> user_lang: Option[String], user_location: Option[String], hashtags: >> Option[Seq[String]]) >> >> Best, >> Georg >> >