Also auto creation is not there On Thu, Mar 5, 2020 at 3:59 PM Taher Koitawala <taher...@gmail.com> wrote:
> Proposal is to add more sources and also have time event time or > processing enhancements further on them > > On Thu, Mar 5, 2020 at 3:50 PM Andrew Pilloud <apill...@google.com> wrote: > >> I believe we have this functionality alredy: >> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/ >> >> Existing GCP tables can also be loaded through the GCP datacatalog >> metastore. What are you proposing that is new? >> >> Andrew >> >> >> On Thu, Mar 5, 2020, 12:29 AM Taher Koitawala <taher...@gmail.com> wrote: >> >>> Hi All, >>> We have been using Apache Beam extensively to process huge >>> amounts of data, while beam is really powerful and can solve a huge number >>> of use cases. A Beam job's development and testing time is significantly >>> high. >>> >>> This gap can be filled with Beam SQL, where a complete SQL based >>> interface can reduce development and testing time to matter of minutes, it >>> also makes Apache Beam more user friendly where a wide variety of audience >>> with different analytical skillsets can interact. >>> >>> The current Beam SQL is still needs to be used programmatically, and so >>> I propose the following additions/improvements. >>> >>> *Note: Whist the below given examples are more GCP biased, they apply to >>> other sources in a generic manner* >>> >>> For Example: Imagine a user who wants to write a stream processing job >>> on Google Cloud Dataflow. The user wants to process credit card transaction >>> streams from Google Cloud PubSub (Something like Kafka) and enrich each >>> record of the stream with some data that is stored in Google Cloud Spanner, >>> after enrichment the user wishes to write the following data to Google >>> Cloud BigQuery. >>> >>> Given Below are the queries which the user should be able to fire on >>> Beam and the rest should be automatically handled by the framework. >>> >>> //Infer schema from Spanner table upon table creation >>> >>> CREATE TABLE SPANNER_CARD_INFO >>> >>> OPTIONS ( >>> >>> ProjectId: “gcp-project”, >>> >>> InstanceId : “spanner-instance-id”, >>> >>> Database: “some-database”, >>> >>> Table: “card_info”, >>> >>> CloudResource: “SPANNER”, >>> >>> CreateTableIfNotExists: “FALSE” >>> >>> ) >>> //Apply schema to each record read from pubsub, and then apply SQL. >>> >>> CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC >>> >>> OPTIONS ( >>> >>> ProjectId: “gcp-project”, >>> >>> Topic: “card-transactions”, >>> >>> CloudResource : “PUBSUB” >>> >>> SubscriptionId : “subscriptionId-1”, >>> >>> CreateTopicIfNotExists: “FALSE”, >>> >>> CreateSubscriptionIfNotExist: “TRUE”, >>> >>> RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc >>> >>> JsonRecordSchema : “{ >>> >>> “CardNumber” : “INT”, >>> >>> “Amount”: “DOUBLE”, >>> >>> “eventTimeStamp” : “EVENT_TIME” >>> >>> }”) >>> >>> //Create table in BigQuery if not exists and insert >>> >>> CREATE TABLE TRANSACTION_HISTORY >>> >>> OPTIONS ( >>> >>> ProjectId: “gcp-project”, >>> >>> CloudResource : “BIGQUERY” >>> >>> dataset: “dataset1”, >>> >>> table : “table1”, >>> >>> CreateTableIfNotExists: “TRUE”, >>> >>> TableSchema : “ >>> >>> { >>> >>> “card_number” : “INT”, >>> >>> “first_name” : “STRING”, >>> >>> “last_name” : “STRING”, >>> >>> “phone” : “INT”, >>> >>> “city” : “STRING”, >>> >>> “amount”: “FLOAT”, >>> >>> “eventtimestamp” : “INT”, >>> >>> }”) >>> >>> //Actual query that should get stretched to a Beam dag >>> >>> INSERT INTO TRANSACTION_HISTORY >>> >>> SELECT >>> pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp >>> FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on >>> (pubsub.card_number = spanner.card_number); >>> >>> >>> >>> Also to consider that if any of the sources or sinks change, we only >>> change the SQL and done!. >>> >>> Please let me know your thoughts about this. >>> >>> Regards, >>> Taher Koitawala >>> >>>