This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 713389c7b1f Add SQL in Notebooks blog post (#17481) 713389c7b1f is described below commit 713389c7b1f719a04422cbd0205b0e719924ac26 Author: Ning Kang <ningkang0...@gmail.com> AuthorDate: Fri Apr 29 15:54:00 2022 -0700 Add SQL in Notebooks blog post (#17481) Added a blog post about running Beam SQL in notebooks with examples. --- .../content/en/blog/beam-sql-with-notebooks.md | 793 +++++++++++++++++++++ website/www/site/data/authors.yml | 4 + .../images/blog/beam-sql-notebooks/image1.png | Bin 0 -> 122789 bytes .../images/blog/beam-sql-notebooks/image10.png | Bin 0 -> 16413 bytes .../images/blog/beam-sql-notebooks/image11.png | Bin 0 -> 62851 bytes .../images/blog/beam-sql-notebooks/image12.png | Bin 0 -> 63523 bytes .../images/blog/beam-sql-notebooks/image13.png | Bin 0 -> 32051 bytes .../images/blog/beam-sql-notebooks/image14.png | Bin 0 -> 189318 bytes .../images/blog/beam-sql-notebooks/image15.png | Bin 0 -> 52765 bytes .../images/blog/beam-sql-notebooks/image16.png | Bin 0 -> 38784 bytes .../images/blog/beam-sql-notebooks/image17.png | Bin 0 -> 41372 bytes .../images/blog/beam-sql-notebooks/image18.png | Bin 0 -> 85434 bytes .../images/blog/beam-sql-notebooks/image19.png | Bin 0 -> 257767 bytes .../images/blog/beam-sql-notebooks/image2.png | Bin 0 -> 69444 bytes .../images/blog/beam-sql-notebooks/image20.png | Bin 0 -> 75168 bytes .../images/blog/beam-sql-notebooks/image21.png | Bin 0 -> 107697 bytes .../images/blog/beam-sql-notebooks/image22.png | Bin 0 -> 161169 bytes .../images/blog/beam-sql-notebooks/image23.png | Bin 0 -> 85261 bytes .../images/blog/beam-sql-notebooks/image24.png | Bin 0 -> 735928 bytes .../images/blog/beam-sql-notebooks/image25.png | Bin 0 -> 91003 bytes .../images/blog/beam-sql-notebooks/image26.png | Bin 0 -> 192296 bytes .../images/blog/beam-sql-notebooks/image3.png | Bin 0 -> 157966 bytes .../images/blog/beam-sql-notebooks/image4.png | Bin 0 -> 35941 bytes .../images/blog/beam-sql-notebooks/image5.png | Bin 0 -> 33982 bytes .../images/blog/beam-sql-notebooks/image6.png | Bin 0 -> 35486 bytes .../images/blog/beam-sql-notebooks/image7.png | Bin 0 -> 38108 bytes .../images/blog/beam-sql-notebooks/image8.png | Bin 0 -> 38778 bytes .../images/blog/beam-sql-notebooks/image9.png | Bin 0 -> 10237 bytes 28 files changed, 797 insertions(+) diff --git a/website/www/site/content/en/blog/beam-sql-with-notebooks.md b/website/www/site/content/en/blog/beam-sql-with-notebooks.md new file mode 100644 index 00000000000..e6eca08fcf4 --- /dev/null +++ b/website/www/site/content/en/blog/beam-sql-with-notebooks.md @@ -0,0 +1,793 @@ +--- +title: "Running Beam SQL in notebooks" +date: 2022-04-28 00:00:01 -0800 +categories: + - blog +aliases: + - /blog/2022/04/28/beam-sql-with-notebooks.html +authors: + - ningk +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +## Intro + +[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) allows a +Beam user to query PCollections with SQL statements. +[Interactive Beam](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive#interactive-beam) +provides an integration between Apache Beam and +[Jupyter Notebooks](https://docs.jupyter.org/en/latest/) (formerly known as +IPython Notebooks) to make pipeline prototyping and data exploration much faster +and easier. +You can set up your own notebook user interface (for example, +[JupyterLab](https://jupyterlab.readthedocs.io/en/stable/getting_started/installation.html) +or classic [Jupyter Notebooks](https://docs.jupyter.org/en/latest/install.html)) +on your own device following their documentations. Alternatively, you can +choose a hosted solution that does everything for you. You are free to select +whichever notebook user interface you prefer. For simplicity, this +post does not go through the notebook environment setup and uses +[Apache Beam Notebooks](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development) +that provides a cloud-hosted +[JupyterLab](https://jupyterlab.readthedocs.io/en/stable/) environment and lets +a Beam user iteratively develop pipelines, inspect pipeline graphs, and parse +individual PCollections in a read-eval-print-loop (REPL) workflow. + +In this post, you will see how to use `beam_sql`, a notebook +[magic](https://ipython.readthedocs.io/en/stable/interactive/magics.html), to +execute Beam SQL in notebooks and inspect the results. + +By the end of the post, it also demonstrates how to use the `beam_sql` magic +with a production environment, such as running it as a one-shot job on +Dataflow. It's optional. To follow those steps, you should have a project in +Google Cloud Platform with +[necessary APIs enabled](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#before_you_begin) +, and you should have enough permissions to create a Google Cloud Storage bucket +(or to use an existing one), query a public Google Cloud BigQuery dataset, and +run Dataflow jobs. + +If you choose to use the cloud hosted notebook solution, once you have your +Google Cloud project ready, you will need to create an Apache Beam Notebooks +instance and open the JupyterLab web interface. Please follow the instructions +given at: +https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#launching_an_notebooks_instance + + +## Getting familiar with the environment + +### Landing page + +After starting your own notebook user interface: for example, if using Apche +Beam Notebooks, after clicking the `OPEN JUPYTERLAB` link, you will land on +the default launcher page of the notebook environment. + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image1.png" + alt="Beam SQL in Notebooks: landing page"> + +On the left side, there is a file explorer to view examples, tutorials and +assets on the notebook instance. To easily navigate the files, you may +double-click the `00-Start_Here.md` (#1 in the screenshot) file to view detailed +information about the files. + +On the right side, it displays the default launcher page of JupyterLab. To +create and open a completely new notebook file and code with a selected version +of Apache Beam, click one of (#2) the items with Apache Beam >=2.34.0 (because +`beam_sql` was introduced in 2.34.0) installed. + +### Create/open a notebook + +For example, if you clicked the image button with Apache Beam 2.36.0, you would +see an `Untitled.ipynb` file created and opened. + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image2.png" + alt="Beam SQL in Notebooks: create/open a notebook "> + +In the file explorer, your new notebook file has been created as +`Untitled.ipynb`. + +On the right side, in the opened notebook, there are 4 buttons on top that you +may interact most frequently with: + + - #1: insert an empty code block after the selected / highlighted code block + - #2: execute the code in the block that is selected / highlighted + - #3: interrupt code execution if your code execution is stuck + - #4: “Restart the kernel”: clear all states from code executions and start + from fresh + +There is a button on the top-right (#5) for you to choose a different Apache +Beam version if needed, so it’s not set in stone. + +You can always double-click a file from the file explorer to open it without +creating a new one. + + +## Beam SQL + +### `beam_sql` magic + +`beam_sql` is an IPython +[custom magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). +If you're not familiar with magics, here are some +[built-in examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). +It's a convenient way to validate your queries locally against known/test data +sources when prototyping a Beam pipeline with SQL, before productionizing it on +remote cluster/services. + +The Apache Beam Notebooks environment has preloaded the `beam_sql` magic and +basic `apache-beam` modules so you can directly use them without additional +imports. You can also explicitly load the magic via +`%load_ext apache_beam.runners.interactive.sql.beam_sql_magics` and +`apache-beam` modules if you set up your own notebook elsewhere. + +You can type: + +``` +%beam_sql -h +``` + +and then execute the code to learn how to use the magic: + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image3.png" + alt="Beam SQL in Notebooks: beam_sql magic help message "> + +The selected/highlighted block is called a notebook cell. It mainly has 3 +components: + - #1: The execution count. `[1]` indicates this block is the first executed + code. It increases by 1 for each piece of code you execute even if you + re-execute the same piece of code. `[ ]` indicates this block is not + executed. + - #2: The cell input: the code gets executed. + - #3: The cell output: the output of the code execution. Here it contains the + help documentation of the `beam_sql` magic. + +### Create a PCollection + +There are 3 scenarios for Beam SQL when creating a PCollection: + +1. Use Beam SQL to create a PCollection from constant values + +``` +%%beam_sql -o pcoll +SELECT CAST(1 AS INT) AS id, CAST('foo' AS VARCHAR) AS str, CAST(3.14 AS DOUBLE) AS flt +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image4.png" + alt="Beam SQL in Notebooks: beam_sql creates a PCollection from raw values."> + +The `beam_sql` magic creates and outputs a PCollection named `pcoll` with +element_type like `BeamSchema_...(id: int32, str: str, flt: float64)`. + +**Note** that you have **not** explicitly created a Beam pipeline. You get a +PCollection because the `beam_sql` magic always implicitly creates a pipeline to +execute your SQL query. To hold the elements with each field's type info, Beam +automatically creates a +[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema) +as the `element_type` for the created PCollection. You will learn more about +schema-aware PCollections later. + +2. Use Beam SQL to query a PCollection + +You can chain another SQL using the output from a previous SQL (or any +schema-aware PCollection produced by any normal Beam PTransforms) as the input +to produce a new PCollection. + +**Note**: if you name the output PCollection, make sure that it’s unique in your +notebook to avoid overwriting a different PCollection. + +``` +%%beam_sql -o id_pcoll +SELECT id FROM pcoll +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image5.png" + alt="Beam SQL in Notebooks: beam_sql creates a PCollection from another."> + +3. Use Beam SQL to join multiple PCollections + +You can query multiple PCollections from a single query. + +``` +%%beam_sql -o str_with_same_id +SELECT id, str FROM pcoll JOIN id_pcoll USING (id) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image6.png" + alt="Beam SQL in Notebooks: beam_sql creates a PCollection from multiple PCollections."> + +Now you have learned how to use the `beam_sql` magic to create PCollections and +inspect their results. + +**Tip**: if you accidentally delete some of the notebook cell outputs, you can +always check the content of a PCollection by invoking `ib.show(pcoll_name)` or +`ib.collect(pcoll_name)` where `ib` stands for “Interactive Beam” +([learn more](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#reading_and_visualizing_the_data)). + +### Schema-aware PCollections +The `beam_sql` magic provides the flexibility to seamlessly mix SQL and non-SQL +Beam statements to build pipelines and even run them on Dataflow. However, each +PCollection queried by Beam SQL needs to have a +[schema](https://beam.apache.org/documentation/programming-guide/#what-is-a-schema). +For the `beam_sql` magic, it’s recommended to use `typing.NamedTuple` when a +schema is desired. You can go through the below example to learn more details +about schema-aware PCollections. + +#### Setup + +In the setup of this example, you will: + + - Install PyPI package `names` using the built-in `%pip` magic: you will use + the module to generate some random English names as the raw data input. + - Define a schema with `NamedTuple` that has 2 attributes: `id` - an unique + numeric identifier of a person; `name` - a string name of a person. + - Define a pipeline with an `InteractiveRunner` to utilize notebook related + features of Apache Beam. + +```python +%pip install names + +import names +from typing import NamedTuple + +class Person(NamedTuple): + id: int + name: str + +p = beam.Pipeline(InteractiveRunner()) +``` + +There is no visible output for the code execution. + +#### Create schema-aware PCollections without using SQL + +```python +persons = (p + | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(10)])) +ib.show(persons) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image7.png" + alt="Beam SQL in Notebooks: create a schema-aware PCollection without SQL."> + +```python +persons_2 = (p + | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(5, 15)])) +ib.show(persons_2) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image8.png" + alt="Beam SQL in Notebooks: create another schema-aware PCollection without SQL."> + +Now you have 2 PCollections both with the same schema defined by the `Person` +class: + - `persons` contains 10 records for 10 persons with ids ranging from 0 to 9, + - `persons_2` contains another 10 records for 10 persons with ids ranging from + 5 to 14. + +#### Encode and Decode of schema-aware PCollections + +For this example, you still need one more piece of data from the first `pcoll` +that you have created with instructions in this post. + +You can use the original `pcoll`. Optionally, if you want to exercise using +coders explicitly with schema-aware PCollections, you can add a Text I/O into +the mix: write the content of `pcoll` into a text file retaining its schema +information, then read the file back into a new schema-aware PCollection called +`pcoll_in_file`, and use the new PCollection to join `persons` and `persons_2` +to find names with the common id in all three of them. + +To encode `pcoll` into a file, execute: + +```python +coder=beam.coders.registry.get_coder(pcoll.element_type) +pcoll | beam.io.textio.WriteToText('/tmp/pcoll', coder=coder) +pcoll.pipeline.run().wait_until_finish() + +!cat /tmp/pcoll* +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image9.png" + alt="Beam SQL in Notebooks: write a schema-aware PCollection into a text file."> + +The above code execution writes the PCollection `pcoll` (basically +`{id: 1, str: foo, flt: 3.14}`) into a text file using the coder assigned by +Beam. As you can see, the file content is recorded in a binary non +human-readable format, and that’s normal. + +To decode the file content into a new PCollection, execute: + +```python +pcoll_in_file = p | beam.io.ReadFromText( + '/tmp/pcoll*', coder=coder).with_output_types( + pcoll.element_type) + +ib.show(pcoll_in_file) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image10.png" + alt="Beam SQL in Notebooks: read a schema-aware PCollection from a text file."> + +**Note** you have to use the same coder during encoding and decoding, and +furthermore you may assign the schema explicitly to the new PCollection through +`with_output_types()`. + +Reading out the encoded binary content from the text file and decoding it with +the correct coder, the content of `pcoll` is recovered into `pcoll_in_file`. You +can use this technique to save and share your data through any Beam I/O (not +necessarily a text file) with collaborators who work on their own pipelines (not +just in your notebook session or pipelines). + +#### Schema in `beam_sql` magic + +The `beam_sql` magic automatically registers a `RowCoder` for your `NamedTuple` +schema so that you only need to focus on preparing your data for query without +worrying about coders. To see more verbose details of what the `beam_sql` magic +does behind the scenes, you can use the `-v` option. + +For example, you can look for all elements with `id < 5` in `persons` with the +below query and assign the output to `persons_id_lt_5`. + +``` +%%beam_sql -o persons_id_lt_5 -v +SELECT * FROM persons WHERE id < 5 +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image11.png" + alt="Beam SQL in Notebooks: beam_sql registers a schema for a PCollection."> + +Since this is the first time running this query, you might see a warning message +about: + +>Schema Person has not been registered to use a RowCoder. Automatically +registering it by running: +beam.coders.registry.register_coder(Person, beam.coders.RowCoder) + +The `beam_sql` magic helps registering a `RowCoder` for each schema you define +and use whenever it finds one. You can also explicitly run the same code to do +so. + +**Note** the output element type is `Person(id: int, name: str)` instead of +`BeamSchema_…` because you have selected all the fields from a single +PCollection of the known type `Person(id: int, name: str)`. + +Another example, you can query for all names from `persons` and `persons_2` with +the same ids and assign the output to `persons_with_common_id`: + +``` +%%beam_sql -o persons_with_common_id -v +SELECT * FROM persons JOIN persons_2 USING (id) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image12.png" + alt="Beam SQL in Notebooks: beam_sql creates a schema for a query."> + +Note the output element type is now some +`BeamSchema_...(id: int64, name: str, name0: str)`. Because you have selected +columns from both PCollections, there is no known schema to hold the result. +Beam automatically creates a schema and differentiates the conflicted field +`name` by suffixing 0 to one of them. + +And since `Person` is already previously registered with a `RowCoder`, there is +no more warning about registering it even with the `-v` option. + +Additionally, you can do a join with `pcoll_in_file`, `persons` and `persons_2`: + +``` +%%beam_sql -o entry_with_common_id + +SELECT pcoll_in_file.id, persons.name AS name_1, persons_2.name AS name_2 +FROM pcoll_in_file JOIN persons ON pcoll_in_file.id = persons.id +JOIN persons_2 ON pcoll_in_file.id = persons_2.id +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image13.png" + alt="Beam SQL in Notebooks: rename fields in a query."> + +The schema generated reflects the column renaming you have done in the SQL. + + +## An Example + +You will go through an example to find out the US state with the most COVID +positive cases on a specific day with data provided by the +[covid tracking project](https://covidtracking.com/). + +### Get the data + +```python +import json +import requests + +# The covidtracking project has stopped collecting new data, current data ends on 2021-03-07 +json_current='https://covidtracking.com/api/v1/states/current.json' + +def get_json_data(url): + with requests.Session() as session: + data = json.loads(session.get(url).text) + return data + +current_data = get_json_data(json_current) + +current_data[0] +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image14.png" + alt="Beam SQL in Notebooks: preview example data."> + +The data is dated as 2021-03-07. It contains many details about COVID cases for +different states in the US. `current_data[0]` is just one of the data points. + +You can get rid of most of the columns of the data. For example, just focus on +“date”, “state”, “positive” and “negative”, and then define a schema +`UsCovidData`: + +```python +from typing import Optional + +class UsCovidData(NamedTuple): + partition_date: str # Remember to str(e['date']). + state: str + positive: int + negative: Optional[int] +``` + +**Note**: + + - `date` is a keyword in (Calcite)SQL, use a different field name such as + `partition_date`; + - `date` from the data is an `int` type, not `str`. Make sure you convert the + data using `str()` or use `date: int`. + - `negative` has missing values and the default is `None`. So instead of + `negative: int`, it should be `negative: Optional[int]`. Or you can convert + `None` into 0 when using the schema. + +Then parse the json data into a PCollection with the schema: + +```python +p_sql = beam.Pipeline(runner=InteractiveRunner()) +covid_data = (p_sql + | 'Create PCollection from json' >> beam.Create(current_data) + | 'Parse' >> beam.Map( + lambda e: UsCovidData( + partition_date=str(e['date']), + state=e['state'], + positive=e['positive'], + negative=e['negative'])).with_output_types(UsCovidData)) +ib.show(covid_data) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image15.png" + alt="Beam SQL in Notebooks: parse example data with a schema."> + +### Query + +You can now find the biggest positive on the “current day” (2021-03-07). + +``` +%%beam_sql -o max_positive +SELECT partition_date, MAX(positive) AS positive +FROM covid_data +GROUP BY partition_date +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image16.png" + alt="Beam SQL in Notebooks: find the biggest positive from the data."> + +However, this is just the positive number. You cannot observe the state that has +this maximum number nor the negative case number for the state. + +To enrich your result, you have to join this data back to the original data set +you have parsed. + +``` +%%beam_sql -o entry_with_max_positive +SELECT covid_data.partition_date, covid_data.state, covid_data.positive, {fn IFNULL(covid_data.negative, 0)} AS negative +FROM covid_data JOIN max_positive +USING (partition_date, positive) +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image17.png" + alt="Beam SQL in Notebooks: enriched data with biggest positive."> + +Now you can see all columns of the data with the maximum positive case on +2021-03-07. +**Note**: to handle missing values of the negative column in the original data, +you can use `{fn IFNULL(covid_data.negative, 0)}` to set null values to 0. + +When you're ready to scale up, you can translate the SQLs into a pipeline with +`SqlTransform`s and run your pipeline on a distributed runner like Flink or +Spark. This post demonstrates it by launching a one-shot job on Dataflow from +the notebook with the help of `beam_sql` magic. + +### Run on Dataflow + +Now that you have a pipeline that parses US COVID data from json to find +positive/negative/state information for the state with the most positive cases +on each day, you can try applying it to all historical daily data and running it +on Dataflow. + +The new data source you will use is a public dataset from USAFacts US +Coronavirus Database that contains all historical daily summary of COVID cases +in the US. + +The schema of data is very similar to what the covid tracking project website +provides. The fields you will query are: `date`, `state`, `confirmed_cases`, and +`deaths`. + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image18.png" + alt="Beam SQL in Notebooks: schema of cloud data."> + +A preview of the data looks like below (you may skip the inspection in BigQuery +and just take a look at the screenshot): + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image19.png" + alt="Beam SQL in Notebooks: preview of cloud data."> + +The format of the data is **slightly different** from the json data you parsed +in the previous pipeline because the numbers are grouped by counties instead of +states, thus some additional aggregations need to be done in the SQLs. + +If you need a fresh execution, you may click the “Restart the kernel” button on +the top menu. + +Full code is as below, on-top of the original pipeline and queries: + + - It changes the source from a single-day data to a more complete historical + data; + - It changes the I/O and schema to accommodate the new dataset; + - It changes the SQLs to include more aggregations to accommodate the new + format of the dataset. + +**Prepare the data with schema** + +```python +from typing import NamedTuple +from typing import Optional + +# Public BQ dataset. +table = 'bigquery-public-data:covid19_usafacts.summary' +# Replace with your project. +project = 'YOUR-PROJECT-NAME-HERE' +# Replace with your GCS bucket. +gcs_location = 'gs://YOUR_GCS_BUCKET_HERE' + +class UsCovidData(NamedTuple): + partition_date: str + state: str + confirmed_cases: Optional[int] + deaths: Optional[int] + + +p_on_dataflow = beam.Pipeline(runner=InteractiveRunner()) +covid_data = (p_on_dataflow + | 'Read dataset' >> beam.io.ReadFromBigQuery( + project=project, table=table, gcs_location=gcs_location) + | 'Parse' >> beam.Map( + lambda e: UsCovidData( + partition_date=str(e['date']), + state=e['state'], + confirmed_cases=int(e['confirmed_cases']), + deaths=int(e['deaths']))).with_output_types(UsCovidData)) +``` + +**Run on Dataflow** + +To run SQL on Dataflow is very simple, you just need to add the option +`-r DataflowRunner`. + +``` +%%beam_sql -o data_by_state -r DataflowRunner +SELECT partition_date, state, confirmed_cases, deaths +FROM covid_data +``` + +Different from previous `beam_sql` magic executions, you won’t see the result +immediately. Instead, a form like below is printed in the notebook cell output: + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image20.png" + alt="Beam SQL in Notebooks: empty run-on-dataflow form."> + +The `beam_sql` magic tries its best to guess your project id and preferred cloud +region. You still have to input additional information necessary to submit a +Dataflow job, such as a GCS bucket to stage the Dataflow job and any additional +Python dependencies the job needs. + +For now, ignore the form in the cell output, because you still need 2 more SQLs +to: 1) find the maximum confirmed cases on each day; 2) join the maximum case +data with the full data_by_state. The `beam_sql` magic allows you to chain SQLs, +so chain 2 more by executing: + +``` +%%beam_sql -o max_cases -r DataflowRunner +SELECT partition_date, MAX(confirmed_cases) as confirmed_cases +FROM data_by_state +GROUP BY partition_date +``` + +And + +``` +%%beam_sql -o data_with_max_cases -r DataflowRunner +SELECT data_by_state.partition_date, data_by_state.state, data_by_state.confirmed_cases, data_by_state.deaths +FROM data_by_state JOIN max_cases +USING (partition_date, confirmed_cases) +``` + +By default, when running `beam_sql` on Dataflow, the output PCollection will be +written to a text file on GCS. The “write” is automatically provided by +`beam_sql` and mainly for your inspection of the output data for this one-shot +Dataflow job. It’s lightweight and does not encode elements for further +development. To save the output and share it with others, you can add more Beam +I/Os into the mix. + +For example, you can appropriately encode elements into text files using the +technique described in the above schema-aware PCollections example. + +```python +from apache_beam.options.pipeline_options import GoogleCloudOptions + +coder = beam.coders.registry.get_coder(data_with_max_cases.element_type) +max_data_file = gcs_location + '/encoded_max_data' +data_with_max_cases | beam.io.textio.WriteToText(max_data_file, coder=coder) +``` + +Furthermore, you can create a new BQ dataset in your own project to store the +processed data. + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image21.png" + alt="Beam SQL in Notebooks: create a new BQ dataset."> + +You have to select the same data location as the public BigQuery data you are +reading. In this case, “us (multiple regions in United States)”. + +Once you finish creating an empty dataset, you can execute below: + +```python +output_table=f'{project}:covid_data.max_analysis' +bq_schema = { + 'fields': [ + {'name': 'partition_date', 'type': 'STRING'}, + {'name': 'state', 'type': 'STRING'}, + {'name': 'confirmed_cases', 'type': 'INTEGER'}, + {'name': 'deaths', 'type': 'INTEGER'}]} +(data_with_max_cases + | 'To json-like' >> beam.Map(lambda x: { + 'partition_date': x.partition_date, + 'state': x.state, + 'confirmed_cases': x.confirmed_cases, + 'deaths': x.deaths}) + | beam.io.WriteToBigQuery( + table=output_table, + schema=bq_schema, + method='STREAMING_INSERTS', + custom_gcs_temp_location=gcs_location)) +``` + +Now back in the form of the last SQL cell output, you may fill in necessary +information to run the pipeline on Dataflow. An example input looks like below: + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image22.png" + alt="Beam SQL in Notebooks: fill in the run-on-Dataflow form."> + +Because this pipeline doesn’t use any additional Python dependency, “Additional +Packages” is left empty. In the previous example where you have installed a +package called `names`, to run that pipeline on Dataflow, you have to put +`names` in this field. + +Once you finish updating your inputs, you can click the `Show Options` button to +view what pipeline options have been configured based on your inputs. A variable +`options_[YOUR_OUTPUT_PCOLL_NAME]` is generated, and you can supply more +pipeline options to it if the form is not enough for your execution. + +Once you are ready to submit the Dataflow job, click the `Run on Dataflow` +button. It tells you where the default output would be written, and after a +while, a line with: + +>Click here for the details of your Dataflow job. + +would be displayed. You can click on the hyperlink to go to your Dataflow job +page. (Optionally, you can ignore the form and continue development to extend +your pipeline. Once you are satisfied with the state of your pipeline, you can +come back to the form and submit the job to Dataflow.) + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image23.png" + alt="Beam SQL in Notebooks: a Dataflow job graph."> + +As you can see, each transform name of the generated Dataflow job is prefixed +with a string `[number]: `. This is to distinguish re-executed codes in +notebooks because Beam requires each transform to have a distinct name. Under +the hood, the `beam_sql` magic also stages your schema information to Dataflow, +so you might see transforms named as `schema_loaded_beam_sql_…`. This is because +the `NamedTuple` defined in the notebook is likely in the `__main__` scope and +Dataflow is not aware of them at all. To minimize user intervention and avoid +pickling the whole main session (and it’s infeasible to pickle the main session +when it contains unpickle-able attributes), the `beam_sql` magic optimizes the +staging process by serializing your schemas, staging them to Dataflow, and then +deserialize/load them for job execution. + +Once the job succeeds, the result of the output PCollection would be written to +places instructed by your I/O transforms. **Note**: running `beam_sql` on +Dataflow generates a one-shot job and it’s not interactive. + +A simple inspection of the data from the default output location: + +``` +!gsutil cat 'gs://ningk-so-test/bq/staging/data_with_max_cases*' +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image24.png" + alt="Beam SQL in Notebooks: inspect the default output file."> + +The text file with encoded binary data written by your `WriteToText`: + +``` +!gsutil cat 'gs://ningk-so-test/bq/encoded_max_data*' +``` + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image25.png" + alt="Beam SQL in Notebooks: inspect the user-defined output file."> + +The table `YOUR-PROJECT:covid_data.max_analysis` created by your +`WriteToBigQuery`: + +<img class="center-block" + src="/images/blog/beam-sql-notebooks/image26.png" + alt="Beam SQL in Notebooks: inspect the output BQ dataset."> + +### Run on other OSS runners directly with the `beam_sql` magic + +On the day this blog is posted, the `beam_sql` magic only supports DirectRunner +(interactive) and DataflowRunner (one-shot). It's a simple wrapper on top of +the `SqlTransform` with interactive input widgets implemented by +[ipywidgets](https://ipywidgets.readthedocs.io/en/stable/). You can implement +your own runner support or utilities by following the +[instructions](https://lists.apache.org/thread/psrx1xhbyjcqbhxx6trf5nvh66c6pk3y). + +Additionally, support for other OSS runners are WIP, for example, +[support using FlinkRunner with the `beam_sql` magic](https://issues.apache.org/jira/browse/BEAM-14373). + + +## Conclusions + +The `beam_sql` magic and Apache Beam Notebooks combined is a convenient tool for +you to learn Beam SQL and mix Beam SQL into prototyping and productionizing ( +e.g., to Dataflow) your Beam pipelines with minimum setups. + +For more details about the Beam SQL syntax, check out the Beam Calcite SQL +[compatibility](https://beam.apache.org/documentation/dsls/sql/calcite/overview/) +and the Apache Calcite SQL +[syntax](https://calcite.apache.org/docs/reference.html). + diff --git a/website/www/site/data/authors.yml b/website/www/site/data/authors.yml index 072e55b4f77..a74b017da16 100644 --- a/website/www/site/data/authors.yml +++ b/website/www/site/data/authors.yml @@ -161,6 +161,10 @@ mxm: name: Maximilian Michels email: m...@apache.org twitter: stadtlegende +ningk: + name: Ning Kang + email: ningkang0...@gmail.com + twitter: ningkang0957 pedro: name: Pedro Galvan email: pe...@sg.com.mx diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image1.png b/website/www/site/static/images/blog/beam-sql-notebooks/image1.png new file mode 100644 index 00000000000..1b708a8c8fe Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image1.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image10.png b/website/www/site/static/images/blog/beam-sql-notebooks/image10.png new file mode 100644 index 00000000000..2ff85bbab74 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image10.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image11.png b/website/www/site/static/images/blog/beam-sql-notebooks/image11.png new file mode 100644 index 00000000000..9fc93978ce1 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image11.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image12.png b/website/www/site/static/images/blog/beam-sql-notebooks/image12.png new file mode 100644 index 00000000000..12a635a48f8 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image12.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image13.png b/website/www/site/static/images/blog/beam-sql-notebooks/image13.png new file mode 100644 index 00000000000..6cd2ea6da50 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image13.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image14.png b/website/www/site/static/images/blog/beam-sql-notebooks/image14.png new file mode 100644 index 00000000000..752bfdd21ba Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image14.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image15.png b/website/www/site/static/images/blog/beam-sql-notebooks/image15.png new file mode 100644 index 00000000000..bb6fdf9220b Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image15.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image16.png b/website/www/site/static/images/blog/beam-sql-notebooks/image16.png new file mode 100644 index 00000000000..fa0c3f01637 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image16.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image17.png b/website/www/site/static/images/blog/beam-sql-notebooks/image17.png new file mode 100644 index 00000000000..b0729dbbcb2 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image17.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image18.png b/website/www/site/static/images/blog/beam-sql-notebooks/image18.png new file mode 100644 index 00000000000..8b73608e909 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image18.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image19.png b/website/www/site/static/images/blog/beam-sql-notebooks/image19.png new file mode 100644 index 00000000000..7235eacf374 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image19.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image2.png b/website/www/site/static/images/blog/beam-sql-notebooks/image2.png new file mode 100644 index 00000000000..4f110eded4d Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image2.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image20.png b/website/www/site/static/images/blog/beam-sql-notebooks/image20.png new file mode 100644 index 00000000000..93334ff0c43 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image20.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image21.png b/website/www/site/static/images/blog/beam-sql-notebooks/image21.png new file mode 100644 index 00000000000..7d3b6a89c52 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image21.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image22.png b/website/www/site/static/images/blog/beam-sql-notebooks/image22.png new file mode 100644 index 00000000000..40abfd8f1af Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image22.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image23.png b/website/www/site/static/images/blog/beam-sql-notebooks/image23.png new file mode 100644 index 00000000000..900a7a22482 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image23.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image24.png b/website/www/site/static/images/blog/beam-sql-notebooks/image24.png new file mode 100644 index 00000000000..6672a4ba175 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image24.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image25.png b/website/www/site/static/images/blog/beam-sql-notebooks/image25.png new file mode 100644 index 00000000000..7d1bdf6fdaa Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image25.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image26.png b/website/www/site/static/images/blog/beam-sql-notebooks/image26.png new file mode 100644 index 00000000000..a8f0184c14e Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image26.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image3.png b/website/www/site/static/images/blog/beam-sql-notebooks/image3.png new file mode 100644 index 00000000000..c3406a08dec Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image3.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image4.png b/website/www/site/static/images/blog/beam-sql-notebooks/image4.png new file mode 100644 index 00000000000..5c136e13d36 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image4.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image5.png b/website/www/site/static/images/blog/beam-sql-notebooks/image5.png new file mode 100644 index 00000000000..89999df14e7 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image5.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image6.png b/website/www/site/static/images/blog/beam-sql-notebooks/image6.png new file mode 100644 index 00000000000..87ef71f7bec Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image6.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image7.png b/website/www/site/static/images/blog/beam-sql-notebooks/image7.png new file mode 100644 index 00000000000..c2a8dbae667 Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image7.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image8.png b/website/www/site/static/images/blog/beam-sql-notebooks/image8.png new file mode 100644 index 00000000000..b648cabe46e Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image8.png differ diff --git a/website/www/site/static/images/blog/beam-sql-notebooks/image9.png b/website/www/site/static/images/blog/beam-sql-notebooks/image9.png new file mode 100644 index 00000000000..794116581ef Binary files /dev/null and b/website/www/site/static/images/blog/beam-sql-notebooks/image9.png differ