I believe we have this functionality alredy:
https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/

Existing GCP tables can also be loaded through the GCP datacatalog
metastore. What are you proposing that is new?

Andrew


On Thu, Mar 5, 2020, 12:29 AM Taher Koitawala <taher...@gmail.com> wrote:

> Hi All,
>          We have been using Apache Beam extensively to process huge
> amounts of data, while beam is really powerful and can solve a huge number
> of use cases. A Beam job's development and testing time is significantly
> high.
>
>    This gap can be filled with Beam SQL, where a complete SQL based
> interface can reduce development and testing time to matter of minutes, it
> also makes Apache Beam more user friendly where a wide variety of audience
> with different analytical skillsets can interact.
>
> The current Beam SQL is still needs to be used programmatically, and so I
> propose the following additions/improvements.
>
> *Note: Whist the below given examples are more GCP biased, they apply to
> other sources in a generic manner*
>
> For Example: Imagine a user who wants to write a stream processing job on
> Google Cloud Dataflow. The user wants to process credit card transaction
> streams from Google Cloud PubSub (Something like Kafka) and enrich each
> record of the stream with some data that is stored in Google Cloud Spanner,
> after enrichment the user wishes to write the following data to Google
> Cloud BigQuery.
>
> Given Below are the queries which the user should be able to fire on Beam
> and the rest should be automatically handled by the framework.
>
> //Infer schema from Spanner table upon table creation
>
> CREATE TABLE SPANNER_CARD_INFO
>
> OPTIONS (
>
>  ProjectId: “gcp-project”,
>
>  InstanceId : “spanner-instance-id”,
>
>  Database: “some-database”,
>
>  Table: “card_info”,
>
>  CloudResource: “SPANNER”,
>
> CreateTableIfNotExists: “FALSE”
>
>   )
>  //Apply schema to each record read from pubsub, and then apply SQL.
>
> CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC
>
> OPTIONS (
>
> ProjectId: “gcp-project”,
>
> Topic: “card-transactions”,
>
> CloudResource : “PUBSUB”
>
> SubscriptionId : “subscriptionId-1”,
>
> CreateTopicIfNotExists: “FALSE”,
>
> CreateSubscriptionIfNotExist: “TRUE”,
>
> RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc
>
> JsonRecordSchema : “{
>
> “CardNumber” : “INT”,
>
> “Amount”: “DOUBLE”,
>
> “eventTimeStamp” : “EVENT_TIME”
>
> }”)
>
> //Create table in BigQuery if not exists and insert
>
> CREATE TABLE TRANSACTION_HISTORY
>
> OPTIONS (
>
> ProjectId: “gcp-project”,
>
> CloudResource : “BIGQUERY”
>
> dataset: “dataset1”,
>
> table : “table1”,
>
> CreateTableIfNotExists: “TRUE”,
>
> TableSchema : “
>
> {
>
> “card_number” : “INT”,
>
> “first_name” : “STRING”,
>
> “last_name” : “STRING”,
>
> “phone” : “INT”,
>
> “city” : “STRING”,
>
> “amount”: “FLOAT”,
>
> “eventtimestamp” : “INT”,
>
> }”)
>
> //Actual query that should get stretched to a Beam dag
>
> INSERT INTO TRANSACTION_HISTORY
>
> SELECT
> pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp
> FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on
> (pubsub.card_number = spanner.card_number);
>
>
>
> Also to consider that if any of the sources or sinks change, we only
> change the SQL and done!.
>
> Please let me know your thoughts about this.
>
> Regards,
> Taher Koitawala
>
>

Reply via email to