At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
to provide library level integration between our 'Event Platform' JSON data
and Flink.  My main goal:

*No case classes or POJOs.  *The JSONSchemas should be enough.

I can actually do this pretty easily with the Table API. I can convert from
JSONSchema to a DataType, and then create a table with that DataType and
format('json').

I'd like to be able to do the same for the DataStream API.  From what I can
tell, to do this I should be using a Row
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
as the record type.  I can also convert from JSONSchema to
TypeInformation<Row> pretty easily, using the Types factory
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
.

While I can convert to and from
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
the Table API to DataStream<Row>, it seems directly using DataStream<Row>
of our JSON could be pretty useful, and would make it possible to use Flink
without instantiating a StreamTableEnvironment or requiring a 'table
planner'.  Also, to convert back up to the Table API from a
DataStream<Row>, I need the explicit TypeInformation<Row>, which I need to
manually construct.

Ah but, JsonRowDeserializationSchema
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.html>
is
deprecated. Okay, fine I can copy it into my code and modify it for my
purposes.  But even if I do, I'm confused about something else:

DeserializationSchema is not Table API specific (e.g. it can be used as the
value deserializer in KafkaSource).  Row is also not Table API specific
(although I know the main purpose is to bridge Table to DataStream API).
However, it seems that constructing a Source using DeserializationSchema is
not really that common?  KafkaSource uses it, but FileSource and
env.fromElements don't?  I'm trying to write integration tests for this
that use the DataStream API.

*tl;dr questions:*

*1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

*2. How can I use a DeserializationSchema<Row> to get a DataStream<Row> or
even DataStreamSource<Row> in a unit test from either a file or
String[]/byte[] of serialized JSON?*

Thank you!

Reply via email to