Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> Great. Thanks.
> But would it be possible to automate this i.e. to have this work
> automatically for the case class / product?
>
> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> taher...@gmail.com>:
>
>> The performant way would be to apply a map function over the stream and
>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>> there is no API like Spark to automatically get all fields.
>>
>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com>
>> wrote:
>>
>>> How can I use it with a scala case class?
>>> If I understand it correctly for better performance the Object Mapper is
>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>> probably I should rephrase to: how can I then map these to case classes
>>> without handcoding it?  https://github.com/json4s/json4s or
>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>> consume strings.
>>>
>>> Best,
>>> Georg
>>>
>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>> taher...@gmail.com>:
>>>
>>>> You can try the Jackson ObjectMapper library and that will get you from
>>>> json to object.
>>>>
>>>> Regards,
>>>> Taher Koitawala
>>>>
>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>>> case-class. How can this be accomplished using the
>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>>> required?
>>>>>
>>>>> I have a Spark background. There, such manual mappings usually are
>>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such 
>>>>> a
>>>>> type of assignment.
>>>>> 1) this is concise
>>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>>> be faster
>>>>>
>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>>> reason why these optimizations are not necessary in Flink?
>>>>>
>>>>>
>>>>> How could I get the following example:
>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>>> val stream = senv.addSource(
>>>>>     new FlinkKafkaConsumer(
>>>>>       "tweets-raw-json",
>>>>>       serializer,
>>>>>       properties
>>>>>     ).setStartFromEarliest() // TODO experiment with different start
>>>>> values
>>>>>   )
>>>>>
>>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>>> through all the attribute fields and parsing the keys from the object node
>>>>> tree.
>>>>>
>>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>>> Option[String], screen_name: Option[String], user_created_at:
>>>>> Option[String], followers_count: Option[Long], friends_count: 
>>>>> Option[Long],
>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>>> Option[Seq[String]])
>>>>>
>>>>> Best,
>>>>> Georg
>>>>>
>>>>

Reply via email to