Hi Georg, you can try using the circe library for this which has a way to automatically generate JSON decoders for scala case classes.
As it was mentioned earlier, Flink does not come packaged with JSON-decoding generators for Scala like spark does. On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <georg.kf.hei...@gmail.com> wrote: > Great. Thanks. > But would it be possible to automate this i.e. to have this work > automatically for the case class / product? > > Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < > taher...@gmail.com>: > >> The performant way would be to apply a map function over the stream and >> then use the Jackson ObjectMapper to convert to scala objects. In flink >> there is no API like Spark to automatically get all fields. >> >> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com> >> wrote: >> >>> How can I use it with a scala case class? >>> If I understand it correctly for better performance the Object Mapper is >>> already initialized in each KafkaConsumer and returning ObjectNodes. So >>> probably I should rephrase to: how can I then map these to case classes >>> without handcoding it? https://github.com/json4s/json4s or >>> https://github.com/FasterXML/jackson-module-scala both only seem to >>> consume strings. >>> >>> Best, >>> Georg >>> >>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < >>> taher...@gmail.com>: >>> >>>> You can try the Jackson ObjectMapper library and that will get you from >>>> json to object. >>>> >>>> Regards, >>>> Taher Koitawala >>>> >>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I want to map a stream of JSON documents from Kafka to a scala >>>>> case-class. How can this be accomplished using the >>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes >>>>> required? >>>>> >>>>> I have a Spark background. There, such manual mappings usually are >>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such >>>>> a >>>>> type of assignment. >>>>> 1) this is concise >>>>> 2) it operates on sparks off-heap memory representations (tungsten) to >>>>> be faster >>>>> >>>>> In Flink, instead, such off-heap optimizations seem not to be talked >>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a >>>>> reason why these optimizations are not necessary in Flink? >>>>> >>>>> >>>>> How could I get the following example: >>>>> val serializer = new JSONKeyValueDeserializationSchema(false) >>>>> val stream = senv.addSource( >>>>> new FlinkKafkaConsumer( >>>>> "tweets-raw-json", >>>>> serializer, >>>>> properties >>>>> ).setStartFromEarliest() // TODO experiment with different start >>>>> values >>>>> ) >>>>> >>>>> to map to this Tweet class concisely, i.e. without manually iterating >>>>> through all the attribute fields and parsing the keys from the object node >>>>> tree. >>>>> >>>>> final case class Tweet(tweet_id: Option[String], text: Option[String], >>>>> source: Option[String], geo: Option[String], place: Option[String], lang: >>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String], >>>>> coordinates: Option[String], user_id: Option[Long], user_name: >>>>> Option[String], screen_name: Option[String], user_created_at: >>>>> Option[String], followers_count: Option[Long], friends_count: >>>>> Option[Long], >>>>> user_lang: Option[String], user_location: Option[String], hashtags: >>>>> Option[Seq[String]]) >>>>> >>>>> Best, >>>>> Georg >>>>> >>>>