How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to consume
strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> You can try the Jackson ObjectMapper library and that will get you from
> json to object.
>
> Regards,
> Taher Koitawala
>
> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I want to map a stream of JSON documents from Kafka to a scala
>> case-class. How can this be accomplished using the
>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>> required?
>>
>> I have a Spark background. There, such manual mappings usually are
>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>> type of assignment.
>> 1) this is concise
>> 2) it operates on sparks off-heap memory representations (tungsten) to be
>> faster
>>
>> In Flink, instead, such off-heap optimizations seem not to be talked much
>> about (sorry if I miss something, I am a Flink newbie). Is there a reason
>> why these optimizations are not necessary in Flink?
>>
>>
>> How could I get the following example:
>> val serializer = new JSONKeyValueDeserializationSchema(false)
>> val stream = senv.addSource(
>>     new FlinkKafkaConsumer(
>>       "tweets-raw-json",
>>       serializer,
>>       properties
>>     ).setStartFromEarliest() // TODO experiment with different start
>> values
>>   )
>>
>> to map to this Tweet class concisely, i.e. without manually iterating
>> through all the attribute fields and parsing the keys from the object node
>> tree.
>>
>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>> source: Option[String], geo: Option[String], place: Option[String], lang:
>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>> coordinates: Option[String], user_id: Option[Long], user_name:
>> Option[String], screen_name: Option[String], user_created_at:
>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>> user_lang: Option[String], user_location: Option[String], hashtags:
>> Option[Seq[String]])
>>
>> Best,
>> Georg
>>
>

Reply via email to