Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> The performant way would be to apply a map function over the stream and
> then use the Jackson ObjectMapper to convert to scala objects. In flink
> there is no API like Spark to automatically get all fields.
>
> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> How can I use it with a scala case class?
>> If I understand it correctly for better performance the Object Mapper is
>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>> probably I should rephrase to: how can I then map these to case classes
>> without handcoding it?  https://github.com/json4s/json4s or
>> https://github.com/FasterXML/jackson-module-scala both only seem to
>> consume strings.
>>
>> Best,
>> Georg
>>
>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>> taher...@gmail.com>:
>>
>>> You can try the Jackson ObjectMapper library and that will get you from
>>> json to object.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>> case-class. How can this be accomplished using the
>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>> required?
>>>>
>>>> I have a Spark background. There, such manual mappings usually are
>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>> type of assignment.
>>>> 1) this is concise
>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>> be faster
>>>>
>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>> reason why these optimizations are not necessary in Flink?
>>>>
>>>>
>>>> How could I get the following example:
>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>> val stream = senv.addSource(
>>>>     new FlinkKafkaConsumer(
>>>>       "tweets-raw-json",
>>>>       serializer,
>>>>       properties
>>>>     ).setStartFromEarliest() // TODO experiment with different start
>>>> values
>>>>   )
>>>>
>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>> through all the attribute fields and parsing the keys from the object node
>>>> tree.
>>>>
>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>> Option[String], screen_name: Option[String], user_created_at:
>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>> Option[Seq[String]])
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>

Reply via email to