Hi All,
         We have been using Apache Beam extensively to process huge amounts
of data, while beam is really powerful and can solve a huge number of use
cases. A Beam job's development and testing time is significantly high.

   This gap can be filled with Beam SQL, where a complete SQL based
interface can reduce development and testing time to matter of minutes, it
also makes Apache Beam more user friendly where a wide variety of audience
with different analytical skillsets can interact.

The current Beam SQL is still needs to be used programmatically, and so I
propose the following additions/improvements.

*Note: Whist the below given examples are more GCP biased, they apply to
other sources in a generic manner*

For Example: Imagine a user who wants to write a stream processing job on
Google Cloud Dataflow. The user wants to process credit card transaction
streams from Google Cloud PubSub (Something like Kafka) and enrich each
record of the stream with some data that is stored in Google Cloud Spanner,
after enrichment the user wishes to write the following data to Google
Cloud BigQuery.

Given Below are the queries which the user should be able to fire on Beam
and the rest should be automatically handled by the framework.

//Infer schema from Spanner table upon table creation

CREATE TABLE SPANNER_CARD_INFO

OPTIONS (

 ProjectId: “gcp-project”,

 InstanceId : “spanner-instance-id”,

 Database: “some-database”,

 Table: “card_info”,

 CloudResource: “SPANNER”,

CreateTableIfNotExists: “FALSE”

  )
 //Apply schema to each record read from pubsub, and then apply SQL.

CREATE TABLE TRANSACTIONS_PUBSUB_TOPIC

OPTIONS (

ProjectId: “gcp-project”,

Topic: “card-transactions”,

CloudResource : “PUBSUB”

SubscriptionId : “subscriptionId-1”,

CreateTopicIfNotExists: “FALSE”,

CreateSubscriptionIfNotExist: “TRUE”,

RecordType: “JSON” //POssible values: Avro, JSON, TVS..etc

JsonRecordSchema : “{

“CardNumber” : “INT”,

“Amount”: “DOUBLE”,

“eventTimeStamp” : “EVENT_TIME”

}”)

//Create table in BigQuery if not exists and insert

CREATE TABLE TRANSACTION_HISTORY

OPTIONS (

ProjectId: “gcp-project”,

CloudResource : “BIGQUERY”

dataset: “dataset1”,

table : “table1”,

CreateTableIfNotExists: “TRUE”,

TableSchema : “

{

“card_number” : “INT”,

“first_name” : “STRING”,

“last_name” : “STRING”,

“phone” : “INT”,

“city” : “STRING”,

“amount”: “FLOAT”,

“eventtimestamp” : “INT”,

}”)

//Actual query that should get stretched to a Beam dag

INSERT INTO TRANSACTION_HISTORY

SELECT
pubsub.card_number,spanner.first_name,spanner.last_name,spanner.phone,spanner.city,pubsub.amount,pubsub.eventTimeStamp
FROM TRANSACTIONS_PUBSUB_TOPIC pubsub join SPANNER_CARD_INFO spanner on
(pubsub.card_number = spanner.card_number);



Also to consider that if any of the sources or sinks change, we only change
the SQL and done!.

Please let me know your thoughts about this.

Regards,
Taher Koitawala

Reply via email to