Hi,

Many thanks.
So do I understand correctly that:

1) similarly to spark the Table API works on some optimized binary
representation
2) this is only available in the SQL way of interaction - there is no
programmatic API

This leads me then to some questions:

q1) I have read somewhere (I think in some Flink Forward presentations)
that the SQL API is not necessarily stable with regards to state - even
with small changes to the DAG (due to optimization). So does this also
/still apply to the table API? (I assume yes)
q2) When I use the DataSet/Stream (classical scala/java) API it looks like
I must create a custom serializer if I want to handle one/all of:

  - side-output failing records and not simply crash the job
  - as asked before automatic serialization to a scala (case) class

q3)
So as asked before:
>>> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
inside the map function is not recommended. From Spark I know that there is
a map-partitions function, i.e. something where a database connection can
be created and then reused for the individua elements. Is a similar
construct available in Flink as well?
>>> Also, I have read a lot of articles and it looks like a lot of people
are using the String serializer and then manually parse the JSON which also
seems inefficient.
Where would I find an example for some Serializer with side outputs for
failed records as well as efficient initialization using some similar
construct to map-partitions?

Best,
Georg

Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> Hi Georg,
>
> I'm afraid the other suggestions are missing the point a bit. From your
> other emails it seems you want to use Kafka with JSON records together
> with the Table API/SQL. For that, take a look at [1] which describes how
> to define data sources for the Table API. Especially the Kafka and JSON
> sections should be relevant.
>
> That first link I mentioned is for the legacy connector API. There is a
> newer API with slightly different properties which will allow us to do
> the kinds of optimization like working on binary data throughout the
> stack: [2]. Unfortunately, there is no programmatic API yet, you would
> have to use `TableEnvironment.executeSql()` to execute SQL DDL that
> defines your sources. There is a FLIP for adding the programmatic API: [3]
>
> Best,
> Aljoscha
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
>
> On 10.07.20 05:01, Aaron Levin wrote:
> > Hi Georg, you can try using the circe library for this which has a way to
> > automatically generate JSON decoders for scala case classes.
> >
> > As it was mentioned earlier, Flink does not come packaged with
> > JSON-decoding generators for Scala like spark does.
> >
> > On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler <georg.kf.hei...@gmail.com>
> > wrote:
> >
> >> Great. Thanks.
> >> But would it be possible to automate this i.e. to have this work
> >> automatically for the case class / product?
> >>
> >> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> >> taher...@gmail.com>:
> >>
> >>> The performant way would be to apply a map function over the stream and
> >>> then use the Jackson ObjectMapper to convert to scala objects. In flink
> >>> there is no API like Spark to automatically get all fields.
> >>>
> >>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler <georg.kf.hei...@gmail.com>
> >>> wrote:
> >>>
> >>>> How can I use it with a scala case class?
> >>>> If I understand it correctly for better performance the Object Mapper
> is
> >>>> already initialized in each KafkaConsumer and returning ObjectNodes.
> So
> >>>> probably I should rephrase to: how can I then map these to case
> classes
> >>>> without handcoding it?  https://github.com/json4s/json4s or
> >>>> https://github.com/FasterXML/jackson-module-scala both only seem to
> >>>> consume strings.
> >>>>
> >>>> Best,
> >>>> Georg
> >>>>
> >>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> >>>> taher...@gmail.com>:
> >>>>
> >>>>> You can try the Jackson ObjectMapper library and that will get you
> from
> >>>>> json to object.
> >>>>>
> >>>>> Regards,
> >>>>> Taher Koitawala
> >>>>>
> >>>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler <georg.kf.hei...@gmail.com
> >
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I want to map a stream of JSON documents from Kafka to a scala
> >>>>>> case-class. How can this be accomplished using the
> >>>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object
> nodes
> >>>>>> required?
> >>>>>>
> >>>>>> I have a Spark background. There, such manual mappings usually are
> >>>>>> discouraged. Instead, they offer a nice API (dataset API) to
> perform such a
> >>>>>> type of assignment.
> >>>>>> 1) this is concise
> >>>>>> 2) it operates on sparks off-heap memory representations (tungsten)
> to
> >>>>>> be faster
> >>>>>>
> >>>>>> In Flink, instead, such off-heap optimizations seem not to be talked
> >>>>>> much about (sorry if I miss something, I am a Flink newbie). Is
> there a
> >>>>>> reason why these optimizations are not necessary in Flink?
> >>>>>>
> >>>>>>
> >>>>>> How could I get the following example:
> >>>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
> >>>>>> val stream = senv.addSource(
> >>>>>>      new FlinkKafkaConsumer(
> >>>>>>        "tweets-raw-json",
> >>>>>>        serializer,
> >>>>>>        properties
> >>>>>>      ).setStartFromEarliest() // TODO experiment with different
> start
> >>>>>> values
> >>>>>>    )
> >>>>>>
> >>>>>> to map to this Tweet class concisely, i.e. without manually
> iterating
> >>>>>> through all the attribute fields and parsing the keys from the
> object node
> >>>>>> tree.
> >>>>>>
> >>>>>> final case class Tweet(tweet_id: Option[String], text:
> Option[String],
> >>>>>> source: Option[String], geo: Option[String], place: Option[String],
> lang:
> >>>>>> Option[String], created_at: Option[String], timestamp_ms:
> Option[String],
> >>>>>> coordinates: Option[String], user_id: Option[Long], user_name:
> >>>>>> Option[String], screen_name: Option[String], user_created_at:
> >>>>>> Option[String], followers_count: Option[Long], friends_count:
> Option[Long],
> >>>>>> user_lang: Option[String], user_location: Option[String], hashtags:
> >>>>>> Option[Seq[String]])
> >>>>>>
> >>>>>> Best,
> >>>>>> Georg
> >>>>>>
> >>>>>
> >
>
>

Reply via email to