I believe we have this functionality alredy: https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/
Existing GCP tables can also be loaded through the GCP datacatalog metastore. What are you proposing that is new? Andrew On Thu, Mar 5, 2020, 12:29 AM Taher Koitawala <taher...@gmail.com> wrote: > Hi All, > We have been using Apache Beam extensively to process huge > amounts of data, while beam is really powerful and can solve a huge number > of use cases. A Beam job's development and testing time is significantly > high. > > This gap can be filled with Beam SQL, where a complete SQL based > interface can reduce development and testing time to matter of minutes, it > also makes Apache Beam more user friendly where a wide variety of audience > with different analytical skillsets can interact. > > The current Beam SQL is still needs to be used programmatically, and so I > propose the following additions/improvements. > > *Note: Whist the below given examples are more GCP biased, they apply to > other sources in a generic manner* > > For Example: Imagine a user who wants to write a stream processing job on > Google Cloud Dataflow. The user wants to process credit card transaction > streams from Google Cloud PubSub (Something like Kafka) and enrich each > record of the stream with some data that is stored in Google Cloud Spanner, > after enrichment the user wishes to write the following data to Google > Cloud BigQuery. > > Given Below are the queries which the user should be able to fire on Beam > and the rest should be automatically handled by the framework. > > //Infer schema from Spanner table upon table creation > > CREATE TABLE SPANNER_CARD_INFO > > OPTIONS ( > > ProjectId: “gcp-project”, > > InstanceId : “spanner-instance-id”, > > Database: “some-database”, > > Table: “card_info”, > > CloudResource: “SPANNER”, > > CreateTableIfNotExists: “FALSE” > > ) > //Apply schema to each record read from pubsub, and then apply SQL. > > CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC > > OPTIONS ( > > ProjectId: “gcp-project”, > > Topic: “card-transactions”, > > CloudResource : “PUBSUB” > > SubscriptionId : “subscriptionId-1”, > > CreateTopicIfNotExists: “FALSE”, > > CreateSubscriptionIfNotExist: “TRUE”, > > RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc > > JsonRecordSchema : “{ > > “CardNumber” : “INT”, > > “Amount”: “DOUBLE”, > > “eventTimeStamp” : “EVENT_TIME” > > }”) > > //Create table in BigQuery if not exists and insert > > CREATE TABLE TRANSACTION_HISTORY > > OPTIONS ( > > ProjectId: “gcp-project”, > > CloudResource : “BIGQUERY” > > dataset: “dataset1”, > > table : “table1”, > > CreateTableIfNotExists: “TRUE”, > > TableSchema : “ > > { > > “card_number” : “INT”, > > “first_name” : “STRING”, > > “last_name” : “STRING”, > > “phone” : “INT”, > > “city” : “STRING”, > > “amount”: “FLOAT”, > > “eventtimestamp” : “INT”, > > }”) > > //Actual query that should get stretched to a Beam dag > > INSERT INTO TRANSACTION_HISTORY > > SELECT > pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp > FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on > (pubsub.card_number = spanner.card_number); > > > > Also to consider that if any of the sources or sinks change, we only > change the SQL and done!. > > Please let me know your thoughts about this. > > Regards, > Taher Koitawala > >