Hi,
Many thanks. So do I understand correctly that: 1) similarly to spark the Table API works on some optimized binary representation 2) this is only available in the SQL way of interaction - there is no programmatic API This leads me then to some questions: q1) I have read somewhere (I think in some Flink Forward presentations) that the SQL API is not necessarily stable with regards to state - even with small changes to the DAG (due to optimization). So does this also /still apply to the table API? (I assume yes) q2) When I use the DataSet/Stream (classical scala/java) API it looks like I must create a custom serializer if I want to handle one/all of: - side-output failing records and not simply crash the job - as asked before automatic serialization to a scala (case) class q3) So as asked before: >>> But I also read that creating the ObjectMapper (i.e. in Jackson terms) inside the map function is not recommended. From Spark I know that there is a map-partitions function, i.e. something where a database connection can be created and then reused for the individua elements. Is a similar construct available in Flink as well? >>> Also, I have read a lot of articles and it looks like a lot of people are using the String serializer and then manually parse the JSON which also seems inefficient. Where would I find an example for some Serializer with side outputs for failed records as well as efficient initialization using some similar construct to map-partitions? Best, Georg Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek < aljos...@apache.org>: > Hi Georg, > > I'm afraid the other suggestions are missing the point a bit. From your > other emails it seems you want to use Kafka with JSON records together > with the Table API/SQL. For that, take a look at [1] which describes how > to define data sources for the Table API. Especially the Kafka and JSON > sections should be relevant. > > That first link I mentioned is for the legacy connector API. There is a > newer API with slightly different properties which will allow us to do > the kinds of optimization like working on binary data throughout the > stack: [2]. Unfortunately, there is no programmatic API yet, you would > have to use `TableEnvironment.executeSql()` to execute SQL DDL that > defines your sources. There is a FLIP for adding the programmatic API: [3] > > Best, > Aljoscha > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html > > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ > > [3] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > > On 10.07.20 05:01, Aaron Levin wrote: > > Hi Georg, you can try using the circe library for this which has a way to > > automatically generate JSON decoders for scala case classes. > > > > As it was mentioned earlier, Flink does not come packaged with > > JSON-decoding generators for Scala like spark does. > > > > On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <georg.kf.hei...@gmail.com> > > wrote: > > > >> Great. Thanks. > >> But would it be possible to automate this i.e. to have this work > >> automatically for the case class / product? > >> > >> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < > >> taher...@gmail.com>: > >> > >>> The performant way would be to apply a map function over the stream and > >>> then use the Jackson ObjectMapper to convert to scala objects. In flink > >>> there is no API like Spark to automatically get all fields. > >>> > >>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com> > >>> wrote: > >>> > >>>> How can I use it with a scala case class? > >>>> If I understand it correctly for better performance the Object Mapper > is > >>>> already initialized in each KafkaConsumer and returning ObjectNodes. > So > >>>> probably I should rephrase to: how can I then map these to case > classes > >>>> without handcoding it? https://github.com/json4s/json4s or > >>>> https://github.com/FasterXML/jackson-module-scala both only seem to > >>>> consume strings. > >>>> > >>>> Best, > >>>> Georg > >>>> > >>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < > >>>> taher...@gmail.com>: > >>>> > >>>>> You can try the Jackson ObjectMapper library and that will get you > from > >>>>> json to object. > >>>>> > >>>>> Regards, > >>>>> Taher Koitawala > >>>>> > >>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com > > > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> I want to map a stream of JSON documents from Kafka to a scala > >>>>>> case-class. How can this be accomplished using the > >>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object > nodes > >>>>>> required? > >>>>>> > >>>>>> I have a Spark background. There, such manual mappings usually are > >>>>>> discouraged. Instead, they offer a nice API (dataset API) to > perform such a > >>>>>> type of assignment. > >>>>>> 1) this is concise > >>>>>> 2) it operates on sparks off-heap memory representations (tungsten) > to > >>>>>> be faster > >>>>>> > >>>>>> In Flink, instead, such off-heap optimizations seem not to be talked > >>>>>> much about (sorry if I miss something, I am a Flink newbie). Is > there a > >>>>>> reason why these optimizations are not necessary in Flink? > >>>>>> > >>>>>> > >>>>>> How could I get the following example: > >>>>>> val serializer = new JSONKeyValueDeserializationSchema(false) > >>>>>> val stream = senv.addSource( > >>>>>> new FlinkKafkaConsumer( > >>>>>> "tweets-raw-json", > >>>>>> serializer, > >>>>>> properties > >>>>>> ).setStartFromEarliest() // TODO experiment with different > start > >>>>>> values > >>>>>> ) > >>>>>> > >>>>>> to map to this Tweet class concisely, i.e. without manually > iterating > >>>>>> through all the attribute fields and parsing the keys from the > object node > >>>>>> tree. > >>>>>> > >>>>>> final case class Tweet(tweet_id: Option[String], text: > Option[String], > >>>>>> source: Option[String], geo: Option[String], place: Option[String], > lang: > >>>>>> Option[String], created_at: Option[String], timestamp_ms: > Option[String], > >>>>>> coordinates: Option[String], user_id: Option[Long], user_name: > >>>>>> Option[String], screen_name: Option[String], user_created_at: > >>>>>> Option[String], followers_count: Option[Long], friends_count: > Option[Long], > >>>>>> user_lang: Option[String], user_location: Option[String], hashtags: > >>>>>> Option[Seq[String]]) > >>>>>> > >>>>>> Best, > >>>>>> Georg > >>>>>> > >>>>> > > > >