Proposal is to add more sources and also have time event time or processing enhancements further on them
On Thu, Mar 5, 2020 at 3:50 PM Andrew Pilloud <apill...@google.com> wrote: > I believe we have this functionality alredy: > https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/ > > Existing GCP tables can also be loaded through the GCP datacatalog > metastore. What are you proposing that is new? > > Andrew > > > On Thu, Mar 5, 2020, 12:29 AM Taher Koitawala <taher...@gmail.com> wrote: > >> Hi All, >> We have been using Apache Beam extensively to process huge >> amounts of data, while beam is really powerful and can solve a huge number >> of use cases. A Beam job's development and testing time is significantly >> high. >> >> This gap can be filled with Beam SQL, where a complete SQL based >> interface can reduce development and testing time to matter of minutes, it >> also makes Apache Beam more user friendly where a wide variety of audience >> with different analytical skillsets can interact. >> >> The current Beam SQL is still needs to be used programmatically, and so I >> propose the following additions/improvements. >> >> *Note: Whist the below given examples are more GCP biased, they apply to >> other sources in a generic manner* >> >> For Example: Imagine a user who wants to write a stream processing job on >> Google Cloud Dataflow. The user wants to process credit card transaction >> streams from Google Cloud PubSub (Something like Kafka) and enrich each >> record of the stream with some data that is stored in Google Cloud Spanner, >> after enrichment the user wishes to write the following data to Google >> Cloud BigQuery. >> >> Given Below are the queries which the user should be able to fire on Beam >> and the rest should be automatically handled by the framework. >> >> //Infer schema from Spanner table upon table creation >> >> CREATE TABLE SPANNER_CARD_INFO >> >> OPTIONS ( >> >> ProjectId: “gcp-project”, >> >> InstanceId : “spanner-instance-id”, >> >> Database: “some-database”, >> >> Table: “card_info”, >> >> CloudResource: “SPANNER”, >> >> CreateTableIfNotExists: “FALSE” >> >> ) >> //Apply schema to each record read from pubsub, and then apply SQL. >> >> CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC >> >> OPTIONS ( >> >> ProjectId: “gcp-project”, >> >> Topic: “card-transactions”, >> >> CloudResource : “PUBSUB” >> >> SubscriptionId : “subscriptionId-1”, >> >> CreateTopicIfNotExists: “FALSE”, >> >> CreateSubscriptionIfNotExist: “TRUE”, >> >> RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc >> >> JsonRecordSchema : “{ >> >> “CardNumber” : “INT”, >> >> “Amount”: “DOUBLE”, >> >> “eventTimeStamp” : “EVENT_TIME” >> >> }”) >> >> //Create table in BigQuery if not exists and insert >> >> CREATE TABLE TRANSACTION_HISTORY >> >> OPTIONS ( >> >> ProjectId: “gcp-project”, >> >> CloudResource : “BIGQUERY” >> >> dataset: “dataset1”, >> >> table : “table1”, >> >> CreateTableIfNotExists: “TRUE”, >> >> TableSchema : “ >> >> { >> >> “card_number” : “INT”, >> >> “first_name” : “STRING”, >> >> “last_name” : “STRING”, >> >> “phone” : “INT”, >> >> “city” : “STRING”, >> >> “amount”: “FLOAT”, >> >> “eventtimestamp” : “INT”, >> >> }”) >> >> //Actual query that should get stretched to a Beam dag >> >> INSERT INTO TRANSACTION_HISTORY >> >> SELECT >> pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp >> FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on >> (pubsub.card_number = spanner.card_number); >> >> >> >> Also to consider that if any of the sources or sinks change, we only >> change the SQL and done!. >> >> Please let me know your thoughts about this. >> >> Regards, >> Taher Koitawala >> >>