Yeah, what I'm working on will help with IO. Basically if you register a
function with SchemaRegistry that converts back and forth between a type
(say JsonObject) and a Beam Row, then it is applied by the framework behind
the scenes as part of DoFn invocation. Concrete example: let's say I have
an IO that reads json objects
  class MyJsonIORead extends PTransform<PBegin, JsonObject> {...}

If you register a schema for this type (or you can also just set the schema
directly on the output PCollection), then Beam knows how to convert back
and forth between JsonObject and Row. So the next ParDo can look like

p.apply(new MyJsonIORead())
.apply(ParDo.of(new DoFn<JsonObject, T>....
    @ProcessElement void process(@Element Row row) {
   })

And Beam will automatically convert JsonObject to a Row for processing (you
aren't forced to do this of course - you can always ask for it as a
JsonObject).

The same is true for output. If you have a sink that takes in JsonObject
but the transform before it produces Row objects (for instance - because
the transform before it is Beam SQL), Beam can automatically convert Row
back to JsonObject for you.

All of this was detailed in the Schema doc I shared a few months ago. There
was a lot of discussion on that document from various parties, and some of
this API is a result of that discussion. This is also working in the branch
JB and I were working on, though not yet integrated back to master.

I would like to actually go further and make Row an interface and provide a
way to automatically put a Row interface on top of any other object (e.g.
JsonObject, Pojo, etc.) This won't change the way the user writes code, but
instead of Beam having to copy and convert at each stage (e.g. from
JsonObject to Row) it simply will create a Row object that uses the the
JsonObject as its underlying storage.

Reuven

On Tue, May 22, 2018 at 11:37 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Well, beam can implement a new mapper but it doesnt help for io. Most of
> modern backends will take json directly, even javax one and it must stay
> generic.
>
> Then since json to pojo mapping is already done a dozen of times, not sure
> it is worth it for now.
>
> Le mar. 22 mai 2018 20:27, Reuven Lax <re...@google.com> a écrit :
>
>> We can do even better btw. Building a SchemaRegistry where automatic
>> conversions can be registered between schema and Java data types. With this
>> the user won't even need a DoFn to do the conversion.
>>
>> On Tue, May 22, 2018, 10:13 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>> Hi guys,
>>>
>>> Checked out what has been done on schema model and think it is
>>> acceptable - regarding the json debate -  if
>>> https://issues.apache.org/jira/browse/BEAM-4381 can be fixed.
>>>
>>> High level, it is about providing a mainstream and not too impacting
>>> model OOTB and JSON seems the most valid option for now, at least for IO
>>> and some user transforms.
>>>
>>> Wdyt?
>>>
>>> Le ven. 27 avr. 2018 18:36, Romain Manni-Bucau <rmannibu...@gmail.com>
>>> a écrit :
>>>
>>>>  Can give it a try end of may, sure. (holidays and work constraints
>>>> will make it hard before).
>>>>
>>>> Le 27 avr. 2018 18:26, "Anton Kedin" <ke...@google.com> a écrit :
>>>>
>>>>> Romain,
>>>>>
>>>>> I don't believe that JSON approach was investigated very thoroughIy. I
>>>>> mentioned few reasons which will make it not the best choice my opinion,
>>>>> but I may be wrong. Can you put together a design doc or a prototype?
>>>>>
>>>>> Thank you,
>>>>> Anton
>>>>>
>>>>>
>>>>> On Thu, Apr 26, 2018 at 10:17 PM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> Le 26 avr. 2018 23:13, "Anton Kedin" <ke...@google.com> a écrit :
>>>>>>
>>>>>> BeamRecord (Row) has very little in common with JsonObject (I assume
>>>>>> you're talking about javax.json), except maybe some similarities of the
>>>>>> API. Few reasons why JsonObject doesn't work:
>>>>>>
>>>>>>    - it is a Java EE API:
>>>>>>       - Beam SDK is not limited to Java. There are probably similar
>>>>>>       APIs for other languages but they might not necessarily carry the 
>>>>>> same
>>>>>>       semantics / APIs;
>>>>>>
>>>>>>
>>>>>> Not a big deal I think. At least not a technical blocker.
>>>>>>
>>>>>>
>>>>>>    - It can change between Java versions;
>>>>>>
>>>>>> No, this is javaee ;).
>>>>>>
>>>>>>
>>>>>>
>>>>>>    - Current Beam java implementation is an experimental feature to
>>>>>>       identify what's needed from such API, in the end we might end up 
>>>>>> with
>>>>>>       something similar to JsonObject API, but likely not
>>>>>>
>>>>>>
>>>>>> I dont get that point as a blocker
>>>>>>
>>>>>>
>>>>>>    - ;
>>>>>>       - represents JSON, which is not an API but an object notation:
>>>>>>       - it is defined as unicode string in a certain format. If you
>>>>>>       choose to adhere to ECMA-404, then it doesn't sound like 
>>>>>> JsonObject can
>>>>>>       represent an Avro object, if I'm reading it right;
>>>>>>
>>>>>>
>>>>>> It is in the generator impl, you can impl an avrogenerator.
>>>>>>
>>>>>>
>>>>>>    - doesn't define a type system (JSON does, but it's lacking):
>>>>>>       - for example, JSON doesn't define semantics for numbers;
>>>>>>       - doesn't define date/time types;
>>>>>>       - doesn't allow extending JSON type system at all;
>>>>>>
>>>>>>
>>>>>> That is why you need a metada object, or simpler, a schema with that
>>>>>> data. Json or beam record doesnt help here and you end up on the same
>>>>>> outcome if you think about it.
>>>>>>
>>>>>>
>>>>>>    - lacks schemas;
>>>>>>
>>>>>> Jsonschema are standard, widely spread and tooled compared to
>>>>>> alternative.
>>>>>>
>>>>>> You can definitely try loosen the requirements and define everything
>>>>>> in JSON in userland, but the point of Row/Schema is to avoid it and 
>>>>>> define
>>>>>> everything in Beam model, which can be extended, mapped to JSON, Avro,
>>>>>> BigQuery Schemas, custom binary format etc., with same semantics across
>>>>>> beam SDKs.
>>>>>>
>>>>>>
>>>>>> This is what jsonp would allow with the benefit of a natural pojo
>>>>>> support through jsonb.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 26, 2018 at 12:28 PM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Just to let it be clear and let me understand: how is BeamRecord
>>>>>>> different from a JsonObject which is an API without implementation (not
>>>>>>> event a json one OOTB)? Advantage of json *api* are indeed natural 
>>>>>>> mapping
>>>>>>> (jsonb is based on jsonp so no new binding to reinvent) and simple
>>>>>>> serialization (json+gzip for ex, or avro if you want to be geeky).
>>>>>>>
>>>>>>> I fail to see the point to rebuild an ecosystem ATM.
>>>>>>>
>>>>>>> Le 26 avr. 2018 19:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>>> Exactly what JB said. We will write a generic conversion from Avro
>>>>>>>> (or json) to Beam schemas, which will make them work transparently with
>>>>>>>> SQL. The plan is also to migrate Anton's work so that POJOs works
>>>>>>>> generically for any schema.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Thu, Apr 26, 2018 at 1:17 AM Jean-Baptiste Onofré <
>>>>>>>> j...@nanthrax.net> wrote:
>>>>>>>>
>>>>>>>>> For now we have a generic schema interface. Json-b can be an impl,
>>>>>>>>> avro could be another one.
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>> Le 26 avr. 2018, à 12:08, Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> a écrit:
>>>>>>>>>>
>>>>>>>>>> Hmm,
>>>>>>>>>>
>>>>>>>>>> avro has still the pitfalls to have an uncontrolled stack which
>>>>>>>>>> brings way too much dependencies to be part of any API,
>>>>>>>>>> this is why I proposed a JSON-P based API (JsonObject) with a
>>>>>>>>>> custom beam entry for some metadata (headers "à la Camel").
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |   Blog
>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>> <http://rmannibucau.wordpress.com> |  Github
>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>
>>>>>>>>>> 2018-04-26 9:59 GMT+02:00 Jean-Baptiste Onofré <j...@nanthrax.net>:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Hi Ismael
>>>>>>>>>>>
>>>>>>>>>>> You mean directly in Beam SQL ?
>>>>>>>>>>>
>>>>>>>>>>> That will be part of schema support: generic record could be one
>>>>>>>>>>> of the payload with across schema.
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> JB
>>>>>>>>>>> Le 26 avr. 2018, à 11:39, "Ismaël Mejía" < ieme...@gmail.com> a
>>>>>>>>>>> écrit:
>>>>>>>>>>>>
>>>>>>>>>>>> Hello Anton,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the descriptive email and the really useful work. Any 
>>>>>>>>>>>> plans
>>>>>>>>>>>> to tackle PCollections of GenericRecord/IndexedRecords? it seems 
>>>>>>>>>>>> Avro
>>>>>>>>>>>> is a natural fit for this approach too.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Ismaël
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>            Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  I want to highlight a couple of improvements to Beam SQL we have 
>>>>>>>>>>>>> been
>>>>>>>>>>>>>
>>>>>>>>>>>>>  working on recently which are targeted to make Beam SQL API 
>>>>>>>>>>>>> easier to use.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Specifically these features simplify conversion of Java Beans 
>>>>>>>>>>>>> and JSON
>>>>>>>>>>>>>
>>>>>>>>>>>>>  strings to Rows.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Feel free to try this and send any bugs/comments/PRs my way.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  **Caveat: this is still work in progress, and has known bugs and 
>>>>>>>>>>>>> incomplete
>>>>>>>>>>>>>
>>>>>>>>>>>>>  features, see below for details.**
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Background
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Beam SQL queries can only be applied to PCollection<Row>. This 
>>>>>>>>>>>>> means that
>>>>>>>>>>>>>
>>>>>>>>>>>>>  users need to convert whatever PCollection elements they have to 
>>>>>>>>>>>>> Rows before
>>>>>>>>>>>>>
>>>>>>>>>>>>>  querying them with SQL. This usually requires manually creating 
>>>>>>>>>>>>> a Schema and
>>>>>>>>>>>>>
>>>>>>>>>>>>>  implementing a custom conversion PTransform<PCollection<
>>>>>>>>>>>>>           Element>,
>>>>>>>>>>>>>
>>>>>>>>>>>>>  PCollection<Row>> (see Beam SQL Guide).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  The improvements described here are an attempt to reduce this 
>>>>>>>>>>>>> overhead for
>>>>>>>>>>>>>
>>>>>>>>>>>>>  few common cases, as a start.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Status
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Introduced a InferredRowCoder to automatically generate rows 
>>>>>>>>>>>>> from beans.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Removes the need to manually define a Schema and Row conversion 
>>>>>>>>>>>>> logic;
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Introduced JsonToRow transform to automatically parse JSON 
>>>>>>>>>>>>> objects to Rows.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Removes the need to manually implement a conversion logic;
>>>>>>>>>>>>>
>>>>>>>>>>>>>  This is still experimental work in progress, APIs will likely 
>>>>>>>>>>>>> change;
>>>>>>>>>>>>>
>>>>>>>>>>>>>  There are known bugs/unsolved problems;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Java Beans
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Introduced a coder which facilitates Rows generation from Java 
>>>>>>>>>>>>> Beans.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Reduces the overhead to:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>             /** Some user-defined Java Bean */
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  class JavaBeanObject implements Serializable {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  String getName() { ... }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // Obtain the objects:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  PCollection<JavaBeanObject> javaBeans = ...;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // Convert to Rows and apply a SQL query:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  PCollection<Row> queryResult =
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  javaBeans
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .setCoder(InferredRowCoder.
>>>>>>>>>>>>>>            ofSerializable(JavaBeanObject.
>>>>>>>>>>>>>>            class))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .apply(BeamSql.query("SELECT name FROM PCOLLECTION"));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Notice, there is no more manual Schema definition or custom 
>>>>>>>>>>>>> conversion
>>>>>>>>>>>>>
>>>>>>>>>>>>>  logic.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Links
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>   example;
>>>>>>>>>>>>>
>>>>>>>>>>>>>   InferredRowCoder;
>>>>>>>>>>>>>
>>>>>>>>>>>>>   test;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  JSON
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Introduced JsonToRow transform. It is possible to query a
>>>>>>>>>>>>>
>>>>>>>>>>>>>  PCollection<String> that contains JSON objects like this:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>             // Assuming JSON objects look like this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // { "type" : "foo", "size" : 333 }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // Define a Schema:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Schema jsonSchema =
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Schema
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .builder()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .addStringField("type")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .addInt32Field("size")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .build();
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // Obtain PCollection of the objects in JSON format:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  PCollection<String> jsonObjects = ...
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  // Convert to Rows and apply a SQL query:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  PCollection<Row> queryResults =
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  jsonObjects
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .apply(JsonToRow.withSchema(
>>>>>>>>>>>>>>            jsonSchema))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION 
>>>>>>>>>>>>>> GROUP BY
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  type"));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Notice, JSON to Row conversion is done by JsonToRow transform. 
>>>>>>>>>>>>> It is
>>>>>>>>>>>>>
>>>>>>>>>>>>>  currently required to supply a Schema.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Links
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>   JsonToRow;
>>>>>>>>>>>>>
>>>>>>>>>>>>>   test/example;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Going Forward
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  fix bugs (BEAM-4163, BEAM-4161 ...)
>>>>>>>>>>>>>
>>>>>>>>>>>>>  implement more features (BEAM-4167, more types of objects);
>>>>>>>>>>>>>
>>>>>>>>>>>>>  wire this up with sources/sinks to further simplify SQL API;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Thank you,
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Anton
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>

Reply via email to