Proposal is to add more sources and also have time event time or processing
enhancements further on them

On Thu, Mar 5, 2020 at 3:50 PM Andrew Pilloud <apill...@google.com> wrote:

> I believe we have this functionality alredy:
> https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/
>
> Existing GCP tables can also be loaded through the GCP datacatalog
> metastore. What are you proposing that is new?
>
> Andrew
>
>
> On Thu, Mar 5, 2020, 12:29 AM Taher Koitawala <taher...@gmail.com> wrote:
>
>> Hi All,
>>          We have been using Apache Beam extensively to process huge
>> amounts of data, while beam is really powerful and can solve a huge number
>> of use cases. A Beam job's development and testing time is significantly
>> high.
>>
>>    This gap can be filled with Beam SQL, where a complete SQL based
>> interface can reduce development and testing time to matter of minutes, it
>> also makes Apache Beam more user friendly where a wide variety of audience
>> with different analytical skillsets can interact.
>>
>> The current Beam SQL is still needs to be used programmatically, and so I
>> propose the following additions/improvements.
>>
>> *Note: Whist the below given examples are more GCP biased, they apply to
>> other sources in a generic manner*
>>
>> For Example: Imagine a user who wants to write a stream processing job on
>> Google Cloud Dataflow. The user wants to process credit card transaction
>> streams from Google Cloud PubSub (Something like Kafka) and enrich each
>> record of the stream with some data that is stored in Google Cloud Spanner,
>> after enrichment the user wishes to write the following data to Google
>> Cloud BigQuery.
>>
>> Given Below are the queries which the user should be able to fire on Beam
>> and the rest should be automatically handled by the framework.
>>
>> //Infer schema from Spanner table upon table creation
>>
>> CREATE TABLE SPANNER_CARD_INFO
>>
>> OPTIONS (
>>
>>  ProjectId: “gcp-project”,
>>
>>  InstanceId : “spanner-instance-id”,
>>
>>  Database: “some-database”,
>>
>>  Table: “card_info”,
>>
>>  CloudResource: “SPANNER”,
>>
>> CreateTableIfNotExists: “FALSE”
>>
>>   )
>>  //Apply schema to each record read from pubsub, and then apply SQL.
>>
>> CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC
>>
>> OPTIONS (
>>
>> ProjectId: “gcp-project”,
>>
>> Topic: “card-transactions”,
>>
>> CloudResource : “PUBSUB”
>>
>> SubscriptionId : “subscriptionId-1”,
>>
>> CreateTopicIfNotExists: “FALSE”,
>>
>> CreateSubscriptionIfNotExist: “TRUE”,
>>
>> RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc
>>
>> JsonRecordSchema : “{
>>
>> “CardNumber” : “INT”,
>>
>> “Amount”: “DOUBLE”,
>>
>> “eventTimeStamp” : “EVENT_TIME”
>>
>> }”)
>>
>> //Create table in BigQuery if not exists and insert
>>
>> CREATE TABLE TRANSACTION_HISTORY
>>
>> OPTIONS (
>>
>> ProjectId: “gcp-project”,
>>
>> CloudResource : “BIGQUERY”
>>
>> dataset: “dataset1”,
>>
>> table : “table1”,
>>
>> CreateTableIfNotExists: “TRUE”,
>>
>> TableSchema : “
>>
>> {
>>
>> “card_number” : “INT”,
>>
>> “first_name” : “STRING”,
>>
>> “last_name” : “STRING”,
>>
>> “phone” : “INT”,
>>
>> “city” : “STRING”,
>>
>> “amount”: “FLOAT”,
>>
>> “eventtimestamp” : “INT”,
>>
>> }”)
>>
>> //Actual query that should get stretched to a Beam dag
>>
>> INSERT INTO TRANSACTION_HISTORY
>>
>> SELECT
>> pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp
>> FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on
>> (pubsub.card_number = spanner.card_number);
>>
>>
>>
>> Also to consider that if any of the sources or sinks change, we only
>> change the SQL and done!.
>>
>> Please let me know your thoughts about this.
>>
>> Regards,
>> Taher Koitawala
>>
>>

Reply via email to