Hmm,

avro has still the pitfalls to have an uncontrolled stack which brings way
too much dependencies to be part of any API,
this is why I proposed a JSON-P based API (JsonObject) with a custom beam
entry for some metadata (headers "à la Camel").


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-04-26 9:59 GMT+02:00 Jean-Baptiste Onofré <j...@nanthrax.net>:

> Hi Ismael
>
> You mean directly in Beam SQL ?
>
> That will be part of schema support: generic record could be one of the
> payload with across schema.
>
> Regards
> JB
> Le 26 avr. 2018, à 11:39, "Ismaël Mejía" <ieme...@gmail.com> a écrit:
>>
>> Hello Anton,
>>
>> Thanks for the descriptive email and the really useful work. Any plans
>> to tackle PCollections of GenericRecord/IndexedRecords? it seems Avro
>> is a natural fit for this approach too.
>>
>> Regards,
>> Ismaël
>>
>> On Wed, Apr 25, 2018 at 9:04 PM, Anton Kedin <ke...@google.com> wrote:
>>
>>>  Hi,
>>>
>>>  I want to highlight a couple of improvements to Beam SQL we have been
>>>  working on recently which are targeted to make Beam SQL API easier to use.
>>>  Specifically these features simplify conversion of Java Beans and JSON
>>>  strings to Rows.
>>>
>>>  Feel free to try this and send any bugs/comments/PRs my way.
>>>
>>>  **Caveat: this is still work in progress, and has known bugs and incomplete
>>>  features, see below for details.**
>>>
>>>  Background
>>>
>>>  Beam SQL queries can only be applied to PCollection<Row>. This means that
>>>  users need to convert whatever PCollection elements they have to Rows 
>>> before
>>>  querying them with SQL. This usually requires manually creating a Schema 
>>> and
>>>  implementing a custom conversion PTransform<PCollection<Element>,
>>>  PCollection<Row>> (see Beam SQL Guide).
>>>
>>>  The improvements described here are an attempt to reduce this overhead for
>>>  few common cases, as a start.
>>>
>>>  Status
>>>
>>>  Introduced a InferredRowCoder to automatically generate rows from beans.
>>>  Removes the need to manually define a Schema and Row conversion logic;
>>>  Introduced JsonToRow transform to automatically parse JSON objects to Rows.
>>>  Removes the need to manually implement a conversion logic;
>>>  This is still experimental work in progress, APIs will likely change;
>>>  There are known bugs/unsolved problems;
>>>
>>>
>>>  Java Beans
>>>
>>>  Introduced a coder which facilitates Rows generation from Java Beans.
>>>  Reduces the overhead to:
>>>
>>>  /** Some user-defined Java Bean */
>>>>  class JavaBeanObject implements Serializable {
>>>>      String getName() { ... }
>>>>  }
>>>>
>>>>
>>>>
>>>>  // Obtain the objects:
>>>>  PCollection<JavaBeanObject> javaBeans = ...;
>>>>
>>>>
>>>>
>>>>  // Convert to Rows and apply a SQL query:
>>>>  PCollection<Row> queryResult =
>>>>    javaBeans
>>>>       .setCoder(InferredRowCoder.ofSerializable(JavaBeanObject.class))
>>>>       .apply(BeamSql.query("SELECT name FROM PCOLLECTION"));
>>>>
>>>
>>>
>>>  Notice, there is no more manual Schema definition or custom conversion
>>>  logic.
>>>
>>>  Links
>>>
>>>   example;
>>>   InferredRowCoder;
>>>   test;
>>>
>>>
>>>  JSON
>>>
>>>  Introduced JsonToRow transform. It is possible to query a
>>>  PCollection<String> that contains JSON objects like this:
>>>
>>>  // Assuming JSON objects look like this:
>>>>  // { "type" : "foo", "size" : 333 }
>>>>
>>>>  // Define a Schema:
>>>>  Schema jsonSchema =
>>>>    Schema
>>>>      .builder()
>>>>      .addStringField("type")
>>>>      .addInt32Field("size")
>>>>      .build();
>>>>
>>>>  // Obtain PCollection of the objects in JSON format:
>>>>  PCollection<String> jsonObjects = ...
>>>>
>>>>  // Convert to Rows and apply a SQL query:
>>>>  PCollection<Row> queryResults =
>>>>    jsonObjects
>>>>      .apply(JsonToRow.withSchema(jsonSchema))
>>>>      .apply(BeamSql.query("SELECT type, AVG(size) FROM PCOLLECTION GROUP BY
>>>>  type"));
>>>>
>>>
>>>
>>>  Notice, JSON to Row conversion is done by JsonToRow transform. It is
>>>  currently required to supply a Schema.
>>>
>>>  Links
>>>
>>>   JsonToRow;
>>>   test/example;
>>>
>>>
>>>  Going Forward
>>>
>>>  fix bugs (BEAM-4163, BEAM-4161 ...)
>>>  implement more features (BEAM-4167, more types of objects);
>>>  wire this up with sources/sinks to further simplify SQL API;
>>>
>>>
>>>  Thank you,
>>>  Anton
>>>
>>

Reply via email to