A PR is up here [1]. Gleb: If I understand what you're saying, I think it's already implemented the way you're describing - PubsubIOJsonTable [2] is just a thin wrapper that connects PubsubIO with Beam SQL tables. Alex/Kenn: I agree with everything you've said :) The hard-coded event_timestamp is troublesome and should be configurable just like mapping to attributes. I like that proposal for the DDL syntax.
[1] https://github.com/apache/beam/pull/10158 [2] https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java On Mon, Nov 18, 2019 at 10:35 AM Kenneth Knowles <[email protected]> wrote: > I like Alex's syntax suggestion. Very readable. In addition to tables > defined via DDL, we also have a metastore abstraction that currently > supports Hive Metastore and Google's Data Catalog. We should think about > how something like what Alex describes could be served by these systems. > > Kenn > > On Sun, Nov 17, 2019 at 4:50 PM Reza Rokni <[email protected]> wrote: > >> +1 to reduced boiler plate for basic things folks want to do with SQL. >> >> I like Alex use of Option for more advanced use cases. >> >> On Sun, 17 Nov 2019 at 20:17, Gleb Kanterov <[email protected]> wrote: >> >>> Expanding on what Kenn said regarding having fewer dependencies on SQL. >>> Can the whole thing be seen as extending PubSubIO, that would implement >>> most of the logic from the proposal, given column annotations, and then >>> having a thin layer that connects it with Beam SQL tables? >>> >>> On Sun, Nov 17, 2019 at 12:38 PM Alex Van Boxel <[email protected]> >>> wrote: >>> >>>> I like it, but I'm worried about the magic event_timestamp injection. >>>> Wouldn't explicit injection via option not be a better approach: >>>> >>>> CREATE TABLE people ( >>>> my_timestamp TIMESTAMP *OPTION(ref="pubsub:event_timestamp)*, >>>> my_id VARCHAR *OPTION(ref="pubsub:attributes['id_name']")*, >>>> name VARCHAR, >>>> age INTEGER >>>> ) >>>> TYPE 'pubsub' >>>> LOCATION 'projects/my-project/topics/my-topic' >>>> >>>> >>>> _/ >>>> _/ Alex Van Boxel >>>> >>>> >>>> On Sat, Nov 16, 2019 at 7:58 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>> Big +1 from me. >>>>> >>>>> Nice explanation. This makes a lot of sense. Much simpler to >>>>> understand with fewer magic strings. It also makes the Beam SQL connector >>>>> less dependent on newer SQL features that are simply less widespread. I'm >>>>> not too surprised that Calcite's nested row support lags behind the rest >>>>> of >>>>> the library. It simply isn't as widespread and important as flat >>>>> relational >>>>> structures. And MAP is even less widespread. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Nov 13, 2019 at 12:32 PM Brian Hulette <[email protected]> >>>>> wrote: >>>>> >>>>>> I've been looking into adding support for writing (i.e. INSERT INTO >>>>>> statements) for the pubsub DDL, which currently only supports reading. >>>>>> This >>>>>> DDL requires the defined schema to have exactly three fields: >>>>>> event_timestamp, attributes, and payload, corresponding to the fields in >>>>>> PubsubMessage (event_timestamp can be configured to come from either >>>>>> publish time or from the value in a particular attribute, and the payload >>>>>> must be a ROW with a schema corresponding to the JSON written to the >>>>>> pubsub >>>>>> topic). >>>>>> >>>>>> When writing, I think it's a bit onerous to require users to use >>>>>> exactly these three top-level fields. For example imagine we have two >>>>>> topics: people, and eligible_voters. people contains a stream of {"name": >>>>>> "..", age: XX} items, and we want eligible_voters to contain a stream >>>>>> with >>>>>> {"name": ".."} items corresponding to people with age >= 18. With the >>>>>> current approach this would look like: >>>>>> >>>>>> ``` >>>>>> CREATE TABLE people ( >>>>>> event_timestamp TIMESTAMP, >>>>>> attributes MAP<VARCHAR, VARCHAR>, >>>>>> payload ROW<name VARCHAR, age INTEGER> >>>>>> ) >>>>>> TYPE 'pubsub' >>>>>> LOCATION 'projects/my-project/topics/my-topic' >>>>>> >>>>>> CREATE TABLE eligible_voters .... >>>>>> >>>>>> INSERT INTO eligible_voters ( >>>>>> SELECT >>>>>> ROW(payload.name AS name) AS payload >>>>>> FROM people >>>>>> WHERE payload.age >= 18 >>>>>> ) >>>>>> ``` >>>>>> >>>>>> This query has lots of renaming and boiler-plate, and furthermore, >>>>>> ROW(..) doesn't seem well supported in Calcite, I had to jump through >>>>>> some >>>>>> hoops (like calling my fields $col1), to make something like this work. >>>>>> I think it would be great if we could instead handle flattened, >>>>>> payload-only schemas. We would still need to have a separate >>>>>> event_timestamp field, but everything else would map to a field in the >>>>>> payload. With this change the previous example would look like: >>>>>> >>>>>> ``` >>>>>> CREATE TABLE people ( >>>>>> event_timestamp TIMESTAMP, >>>>>> name VARCHAR, >>>>>> age INTEGER >>>>>> ) >>>>>> TYPE 'pubsub' >>>>>> LOCATION 'projects/my-project/topics/my-topic' >>>>>> >>>>>> CREATE TABLE eligible_voters ... >>>>>> >>>>>> INSERT INTO eligible_voters ( >>>>>> SELECT >>>>>> name >>>>>> FROM people >>>>>> WHERE age >= 18 >>>>>> ) >>>>>> ``` >>>>>> >>>>>> This is much cleaner! But the overall approach has an obvious >>>>>> downside - with the tabke definition written like this it's impossible to >>>>>> read from or write to the message attributes (unless one is being used >>>>>> for >>>>>> event_timestamp). I think we can mitigate this in two ways: >>>>>> 1. In the future, this flattened schema definition could be >>>>>> represented as something like a view on the expanded definition. We could >>>>>> allow users to provide some metadata indicating that a column should >>>>>> correspond to a particular attribute, rather than a field in the payload. >>>>>> To me this feels similar to how you indicate a column should be indexed >>>>>> in >>>>>> a database. It's data that's relevant to the storage system, and not to >>>>>> the >>>>>> actual query, so it belongs in CREATE TABLE. >>>>>> 2. In the meantime, we can continue to support the current syntax. If >>>>>> a pubsub table definition has *exactly* three fields with the expected >>>>>> types: event_timestamp TIMESTAMP, payload ROW<...>, and attributes >>>>>> MAP<VARCHAR, VARCHAR>, we can continue to use the current codepath. >>>>>> Otherwise we will use the flattened schema. >>>>>> >>>>>> Please let me know if anyone has any objections to this approach, >>>>>> otherwise I plan on moving forward with it - I should have a PR up >>>>>> shortly. >>>>>> >>>>>> Brian >>>>>> >>>>> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> >
