I've been looking into adding support for writing (i.e. INSERT INTO
statements) for the pubsub DDL, which currently only supports reading. This
DDL requires the defined schema to have exactly three fields:
event_timestamp, attributes, and payload, corresponding to the fields in
PubsubMessage (event_timestamp can be configured to come from either
publish time or from the value in a particular attribute, and the payload
must be a ROW with a schema corresponding to the JSON written to the pubsub
topic).

When writing, I think it's a bit onerous to require users to use exactly
these three top-level fields. For example imagine we have two topics:
people, and eligible_voters. people contains a stream of {"name": "..",
age: XX} items, and we want eligible_voters to contain a stream with
{"name": ".."} items corresponding to people with age >= 18. With the
current approach this would look like:

```
CREATE TABLE people (
    event_timestamp TIMESTAMP,
    attributes MAP<VARCHAR, VARCHAR>,
    payload ROW<name VARCHAR, age INTEGER>
  )
  TYPE 'pubsub'
  LOCATION 'projects/my-project/topics/my-topic'

CREATE TABLE eligible_voters ....

INSERT INTO eligible_voters (
  SELECT
    ROW(payload.name AS name) AS payload
    FROM people
    WHERE payload.age >= 18
)
```

This query has lots of renaming and boiler-plate, and furthermore, ROW(..)
doesn't seem well supported in Calcite, I had to jump through some hoops
(like calling my fields $col1), to make something like this work.
I think it would be great if we could instead handle flattened,
payload-only schemas. We would still need to have a separate
event_timestamp field, but everything else would map to a field in the
payload. With this change the previous example would look like:

```
CREATE TABLE people (
    event_timestamp TIMESTAMP,
    name VARCHAR,
    age INTEGER
  )
  TYPE 'pubsub'
  LOCATION 'projects/my-project/topics/my-topic'

CREATE TABLE eligible_voters ...

INSERT INTO eligible_voters (
  SELECT
    name
    FROM people
    WHERE age >= 18
)
```

This is much cleaner! But the overall approach has an obvious downside -
with the tabke definition written like this it's impossible to read from or
write to the message attributes (unless one is being used for
event_timestamp). I think we can mitigate this in two ways:
1. In the future, this flattened schema definition could be represented as
something like a view on the expanded definition. We could allow users to
provide some metadata indicating that a column should correspond to a
particular attribute, rather than a field in the payload. To me this feels
similar to how you indicate a column should be indexed in a database. It's
data that's relevant to the storage system, and not to the actual query, so
it belongs in CREATE TABLE.
2. In the meantime, we can continue to support the current syntax. If a
pubsub table definition has *exactly* three fields with the expected types:
event_timestamp TIMESTAMP, payload ROW<...>, and attributes MAP<VARCHAR,
VARCHAR>, we can continue to use the current codepath. Otherwise we will
use the flattened schema.

Please let me know if anyone has any objections to this approach, otherwise
I plan on moving forward with it - I should have a PR up shortly.

Brian

Reply via email to