This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a6bb4ae [BEAM-10708] Added an example notebook for beam_sql magic new 919fdef Merge pull request #15518 from KevinGG/beam_sql_out_cache a6bb4ae is described below commit a6bb4ae1a3889759ec92d2a21bc52bd2b026cbec Author: KevinGG <kawai...@gmail.com> AuthorDate: Wed Sep 15 14:18:11 2021 -0700 [BEAM-10708] Added an example notebook for beam_sql magic 1. Supported show/collect for beam_sql outputs. 2. Added a parser for beam_sql inputs. --- .../interactive/display/pcoll_visualization.py | 32 +- .../Run Beam SQL with beam_sql magic.ipynb | 549 +++++++++++++++++++++ .../runners/interactive/interactive_beam.py | 35 +- .../runners/interactive/interactive_beam_test.py | 12 +- .../runners/interactive/sql/beam_sql_magics.py | 132 +++-- .../interactive/sql/beam_sql_magics_test.py | 14 + .../apache_beam/runners/interactive/sql/utils.py | 28 +- .../apache_beam/runners/interactive/utils.py | 14 + .../apache_beam/runners/interactive/utils_test.py | 9 + 9 files changed, 771 insertions(+), 54 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py index 9e071fe..f317978 100644 --- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py +++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py @@ -27,6 +27,7 @@ import datetime import html import logging from datetime import timedelta +from typing import Optional from dateutil import tz @@ -238,19 +239,42 @@ def visualize( def visualize_computed_pcoll( - pcoll_name: str, pcoll: beam.pvalue.PCollection) -> None: + pcoll_name: str, + pcoll: beam.pvalue.PCollection, + max_n: int, + max_duration_secs: float, + dynamic_plotting_interval: Optional[int] = None, + include_window_info: bool = False, + display_facets: bool = False) -> None: """A simple visualize alternative. When the pcoll_name and pcoll pair identifies a watched and computed PCollection in the current interactive environment without ambiguity, an - ElementStream can be built directly from cache. + ElementStream can be built directly from cache. Returns immediately, the + visualization is asynchronous, but guaranteed to end in the near future. + + Args: + pcoll_name: the variable name of the PCollection. + pcoll: the PCollection to be visualized. + max_n: the maximum number of elements to visualize. + max_duration_secs: max duration of elements to read in seconds. + dynamic_plotting_interval: the interval in seconds between visualization + updates if provided; otherwise, no dynamic plotting. + include_window_info: whether to include windowing info in the elements. + display_facets: whether to display the facets widgets. """ pipeline = ie.current_env().user_pipeline(pcoll.pipeline) rm = ie.current_env().get_recording_manager(pipeline, create_if_absent=True) + stream = rm.read( - pcoll_name, pcoll, max_n=float('inf'), max_duration_secs=float('inf')) + pcoll_name, pcoll, max_n=max_n, max_duration_secs=max_duration_secs) if stream: - visualize(stream, element_type=pcoll.element_type) + visualize( + stream, + dynamic_plotting_interval=dynamic_plotting_interval, + include_window_info=include_window_info, + display_facets=display_facets, + element_type=pcoll.element_type) class PCollectionVisualization(object): diff --git a/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with beam_sql magic.ipynb b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with beam_sql magic.ipynb new file mode 100644 index 0000000..d68fa8c --- /dev/null +++ b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with beam_sql magic.ipynb @@ -0,0 +1,549 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "83acd0be", + "metadata": {}, + "source": [ + "Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "<!--\n", + " Licensed to the Apache Software Foundation (ASF) under one\n", + " or more contributor license agreements. See the NOTICE file\n", + " distributed with this work for additional information\n", + " regarding copyright ownership. The ASF licenses this file\n", + " to you under the Apache License, Version 2.0 (the\n", + " \"License\"); you may not use this file except in compliance\n", + " with the License. You may obtain a copy of the License at\n", + "\n", + " http://www.apache.org/licenses/LICENSE-2.0\n", + "\n", + " Unless required by applicable law or agreed to in writing,\n", + " software distributed under the License is distributed on an\n", + " \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + " KIND, either express or implied. See the License for the\n", + " specific language governing permissions and limitations\n", + " under the License.\n", + "-->\n" + ] + }, + { + "cell_type": "markdown", + "id": "5022179a", + "metadata": {}, + "source": [ + "# Run Beam SQL in notebooks\n", + "\n", + "[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) allows a Beam user to query PCollections with SQL statements. Currently, `InteractiveRunner` does not support `SqlTransform` due to [BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user could use the `beam_sql` magic to run Beam SQL in the notebook and introspect the result.\n", + "\n", + "`beam_sql` is an IPython [custom magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If you're not familiar with magics, here are some [built-in examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). It's a convenient way to validate your queries locally against known/test data sources when prototyping a Beam pipeline with SQL, before productionizing it on remote cluster/services.\n", + "\n", + "First, let's load the `beam_sql` magic:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c6b6e3c1", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext apache_beam.runners.interactive.sql.beam_sql_magics" + ] + }, + { + "cell_type": "markdown", + "id": "a7c43b84", + "metadata": {}, + "source": [ + "Since SQL support in Beam Python SDK is implemented through xLang external transform, make sure you have below prerequisites:\n", + "- Have `docker` installed;\n", + "- Have jdk8 or jdk11 installed and $JAVA_HOME set;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b280710a", + "metadata": {}, + "outputs": [], + "source": [ + "!docker image list\n", + "!java --version\n", + "!echo $JAVA_HOME" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "28b1b320", + "metadata": {}, + "outputs": [], + "source": [ + "# Optionally sets the logging level to reduce distraction.\n", + "import logging\n", + "\n", + "logging.root.setLevel(logging.ERROR)" + ] + }, + { + "cell_type": "markdown", + "id": "f6b8789f", + "metadata": {}, + "source": [ + "**Important**: if you're using Beam built from your local source code, additionally:\n", + "\n", + "- Have the Java expansion service shadowjar built. Go to the root directory of your local beam repo and then execute:\n", + " `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n", + "- Based on your jdk version, pull the docker image `docker pull apache/beam_java11_sdk` or `docker pull apache/beam_java8_sdk`.\n", + "- Then tag the image with your current Beam dev version. You can check the dev version under `apache_beam.version.__version__`. For example, if you're using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`." + ] + }, + { + "cell_type": "markdown", + "id": "14c8967d", + "metadata": {}, + "source": [ + "## Query#1 - A simple static query\n", + "\n", + "The `beam_sql` magic can be used as either a line magic or a cell magic.\n", + "You can check its usage by running:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c212dd89", + "metadata": {}, + "outputs": [], + "source": [ + "%beam_sql -h" + ] + }, + { + "cell_type": "markdown", + "id": "7914c1aa", + "metadata": {}, + "source": [ + "You can run a simple SQL query (in Apache Calcite SQL [syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/)) to create a [schema-aware PCollection](https://beam.apache.org/documentation/programming-guide/#schemas) from static values." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "895341fa", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql -o query1_data\n", + "SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`" + ] + }, + { + "cell_type": "markdown", + "id": "c394ead5", + "metadata": {}, + "source": [ + "The `beam_sql` magic shows you the result of the SQL query.\n", + "\n", + "It also creates and outputs a PCollection named `query1_data` with `element_type` like `BeamSchema_...(id: int32, str: str)`.\n", + "\n", + "Note that you have **not** explicitly created a Beam pipeline. You get a PCollection because the `beam_sql` magic always **implicitly creates** a pipeline to execute your SQL query. To hold the elements with each field's type info, Beam automatically creates a schema as the `element_type` for the created PCollection." + ] + }, + { + "cell_type": "markdown", + "id": "981b2cc9", + "metadata": {}, + "source": [ + "To introspect the data again with more knobs, you can use `show`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e97caf83", + "metadata": {}, + "outputs": [], + "source": [ + "from apache_beam.runners.interactive import interactive_beam as ib\n", + "ib.show(query1_data)\n", + "# Uncomment below to set more args.\n", + "# ib.show(query1_data, visualize_data=True, include_window_info=True)" + ] + }, + { + "cell_type": "markdown", + "id": "f58b15a8", + "metadata": {}, + "source": [ + "To materialize the PCollection into a pandas [DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe) object, you can use `collect`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "47b8da1a", + "metadata": {}, + "outputs": [], + "source": [ + "ib.collect(query1_data)" + ] + }, + { + "cell_type": "markdown", + "id": "09b4f24c", + "metadata": {}, + "source": [ + "You can also additionally append some transforms such as writing to a text file and print the elements:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a650bbb", + "metadata": {}, + "outputs": [], + "source": [ + "import apache_beam as beam\n", + "\n", + "coder=beam.coders.registry.get_coder(query1_data.element_type)\n", + "print(coder)\n", + "query1_data | beam.io.textio.WriteToText('/tmp/query1_data', coder=coder)\n", + "query1_data | beam.Map(print)" + ] + }, + { + "cell_type": "markdown", + "id": "6cf89704", + "metadata": {}, + "source": [ + "Execute the pipeline as a normal pipeline running on DirectRunner and inspect the output file." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d524e1a0", + "metadata": {}, + "outputs": [], + "source": [ + "!rm -rf /tmp/query1_data*\n", + "query1_data.pipeline.run().wait_until_finish()\n", + "!ls /tmp/query1_data*\n", + "!cat /tmp/query1_data*" + ] + }, + { + "cell_type": "markdown", + "id": "5600945a", + "metadata": {}, + "source": [ + "The coder in use is a `RowCoder`. The element is encoded and written to the text file. When inspecting it directly, it may display garbled strings. The file will be revisited later in Query#4." + ] + }, + { + "cell_type": "markdown", + "id": "30aa1188", + "metadata": {}, + "source": [ + "### [Optional] Omit the `-o` option.\n", + "If the option is omitted, an output name is auto-generated based on the SQL query and PCollection (if any) it queries. Optionally, you can also use the `_[{execution_count}]` convention: `_` for last output and `_{execution_count}` for a specific cell execution output.\n", + "\n", + "However, explicitly naming the output is recommended for better notebook readability and to avoid unexpected errors.\n", + "\n", + "Below example outputs a PCollection named like `sql_output_...`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b445e4f1", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql\n", + "SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 AS DOUBLE) AS `flt`" + ] + }, + { + "cell_type": "markdown", + "id": "c7b9e4fb", + "metadata": {}, + "source": [ + "Now that you are familiar with the `beam_sql` magic, you can build more queries against PCollections.\n", + "\n", + "Let's install the `names` package to randomly generate some names." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ef1ca0fc", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install names" + ] + }, + { + "cell_type": "markdown", + "id": "1c0d5739", + "metadata": {}, + "source": [ + "Import all modules needed for this example." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20cdf3b9", + "metadata": {}, + "outputs": [], + "source": [ + "import names\n", + "import typing\n", + "\n", + "import apache_beam as beam\n", + "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", + "from apache_beam.runners.interactive import interactive_beam as ib" + ] + }, + { + "cell_type": "markdown", + "id": "00db1574", + "metadata": {}, + "source": [ + "Create a pipeline `p` with the `InteractiveRunner`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24caeb60", + "metadata": {}, + "outputs": [], + "source": [ + "p = beam.Pipeline(InteractiveRunner())" + ] + }, + { + "cell_type": "markdown", + "id": "0a4ca6eb", + "metadata": {}, + "source": [ + "Then let's create a schema with `typing.NamedTuple`. Let's call it `Person` with a field `id` and a field `name`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23910a9d", + "metadata": {}, + "outputs": [], + "source": [ + "class Person(typing.NamedTuple):\n", + " id: int\n", + " name: str" + ] + }, + { + "cell_type": "markdown", + "id": "5c626d63", + "metadata": {}, + "source": [ + "With `beam_sql` magic, you can utilize all the Beam I/O connectors (streaming is currently not supported due to `DirectRunner` not supporting streaming pipeline with `SqlTransform`) as source of data, then build a SQL query against all the data and check the output. If needed, you can sink the output following the `WriteToText` example demonstrated above." + ] + }, + { + "cell_type": "markdown", + "id": "2d892920", + "metadata": {}, + "source": [ + "## Query#2 - Querying a single PCollection\n", + "\n", + "Let's build a PCollection with 10 random `Person` typed elements." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a5fc9b9", + "metadata": {}, + "outputs": [], + "source": [ + "persons = (p \n", + " | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(10)]))\n", + "ib.show(persons)" + ] + }, + { + "cell_type": "markdown", + "id": "84d64746", + "metadata": {}, + "source": [ + "You can look for all elements with `id < 5` in `persons` with the below query and assign the output to `persons_id_lt_5`. Also, you can enable `-v` option to see more details about the execution." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "07db1116", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql -o persons_id_lt_5 -v\n", + "SELECT * FROM persons WHERE id <5" + ] + }, + { + "cell_type": "markdown", + "id": "68afa962", + "metadata": {}, + "source": [ + "With `-v`, if it's the first time running this query, you might see a warning message about\n", + "\n", + "```\n", + "Schema Person has not been registered to use a RowCoder. Automatically registering it by running: beam.coders.registry.register_coder(Person, beam.coders.RowCoder)\n", + "```\n", + "\n", + "The `beam_sql` magic helps registering a `RowCoder` for each schema you define and use whenever it finds one. You can also explicitly run the same code to do so.\n", + "\n", + "Note the output element type is `Person(id: int, name: str)` instead of `BeamSchema_...` because you have selected all the fields from a single PCollection of the known type `Person(id: int, name: str)`." + ] + }, + { + "cell_type": "markdown", + "id": "79587515", + "metadata": {}, + "source": [ + "## Query#3 - Joining multiple PCollections\n", + "\n", + "You can build a `persons_2` PCollection with a different range of `id`s and `name`s. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c01fa39a", + "metadata": {}, + "outputs": [], + "source": [ + "persons_2 = (p \n", + " | beam.Create([Person(id=x, name=names.get_full_name()) for x in range(5, 15)]))\n", + "ib.show(persons_2)" + ] + }, + { + "cell_type": "markdown", + "id": "6904ff8e", + "metadata": {}, + "source": [ + "Then query for all `name`s from `persons` and `persons_2` with the same `id`s and assign the output to `persons_with_common_id`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a0a60ff", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql -o persons_with_common_id -v\n", + "SELECT * FROM persons JOIN persons_2 USING (id)" + ] + }, + { + "cell_type": "markdown", + "id": "4bb4df8a", + "metadata": {}, + "source": [ + "Note the output element type is now some `BeamSchema_...(id: int64, name: str, name0: str)`. Because you have selected columns from both PCollections, there is no known schema to hold the result. Beam automatically creates a schema and differentiates conflicted field `name` by suffixing `0` to one of them.\n", + "\n", + "And since `Person` is already previously registered with a `RowCoder`, there is no more warning about registering it anymore even with `-v`." + ] + }, + { + "cell_type": "markdown", + "id": "cfcfeb76", + "metadata": {}, + "source": [ + "## Query#4 - Join multiple PCollections, including I/O." + ] + }, + { + "cell_type": "markdown", + "id": "ce8abc3d", + "metadata": {}, + "source": [ + "Let's read the file written by Query#1 and use it to join `persons` and `persons_2` to find `name`s with the common `id` in all three of them. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1dea37b", + "metadata": {}, + "outputs": [], + "source": [ + "# Use the exact same coder used when WriteToText and explicitly set the output types.\n", + "query1_result_in_file = p | beam.io.ReadFromText(\n", + " '/tmp/query1_data*', coder=coder).with_output_types(\n", + " query1_data.element_type)\n", + "\n", + "# Check all the data sources.\n", + "ib.show(query1_result_in_file)\n", + "ib.show(persons)\n", + "ib.show(persons_2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4bf6c422", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql -o entry_with_common_id\n", + "\n", + "SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name AS `name_2`\n", + "FROM query1_result_in_file JOIN persons ON query1_result_in_file.id = persons.id\n", + "JOIN persons_2 ON query1_result_in_file.id = persons_2.id" + ] + }, + { + "cell_type": "markdown", + "id": "282f6173", + "metadata": {}, + "source": [ + "You can also chain another `beam_sql` magic to get just `name_1`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d858dd6c", + "metadata": {}, + "outputs": [], + "source": [ + "%%beam_sql -o name_found\n", + "SELECT name_1 AS `name` FROM entry_with_common_id" + ] + } + ], + "metadata": { + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py b/sdks/python/apache_beam/runners/interactive/interactive_beam.py index 0b3f8a3..fc55bf1 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py @@ -41,9 +41,11 @@ from apache_beam.dataframe.frame_base import DeferredBase from apache_beam.runners.interactive import interactive_environment as ie from apache_beam.runners.interactive.display import pipeline_graph from apache_beam.runners.interactive.display.pcoll_visualization import visualize +from apache_beam.runners.interactive.display.pcoll_visualization import visualize_computed_pcoll from apache_beam.runners.interactive.options import interactive_options from apache_beam.runners.interactive.utils import deferred_df_to_pcollection from apache_beam.runners.interactive.utils import elements_to_df +from apache_beam.runners.interactive.utils import find_pcoll_name from apache_beam.runners.interactive.utils import progress_indicated from apache_beam.runners.runner import PipelineState @@ -448,7 +450,7 @@ def show( # Iterate through the given PCollections and convert any deferred DataFrames # or Series into PCollections. - pcolls = [] + pcolls = set() # The element type is used to help visualize the given PCollection. For the # deferred DataFrame/Series case it is the proxy of the frame. @@ -462,14 +464,14 @@ def show( element_types[pcoll] = element_type - pcolls.append(pcoll) + pcolls.add(pcoll) assert isinstance(pcoll, beam.pvalue.PCollection), ( '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll)) assert len(pcolls) > 0, ( 'Need at least 1 PCollection to show data visualization.') - user_pipeline = pcolls[0].pipeline + user_pipeline = ie.current_env().user_pipeline(next(iter(pcolls)).pipeline) if isinstance(n, str): assert n == 'inf', ( @@ -488,6 +490,20 @@ def show( if duration == 'inf': duration = float('inf') + previously_computed_pcolls = { + pcoll + for pcoll in pcolls if pcoll in ie.current_env().computed_pcollections + } + for pcoll in previously_computed_pcolls: + visualize_computed_pcoll( + find_pcoll_name(pcoll), + pcoll, + n, + duration, + include_window_info=include_window_info, + display_facets=visualize_data) + pcolls = pcolls - previously_computed_pcolls + recording_manager = ie.current_env().get_recording_manager( user_pipeline, create_if_absent=True) recording = recording_manager.record(pcolls, max_n=n, max_duration=duration) @@ -509,7 +525,6 @@ def show( stream, include_window_info=include_window_info, element_type=element_types[stream.pcoll]) - if recording.is_computed(): return @@ -591,10 +606,20 @@ def collect(pcoll, n='inf', duration='inf', include_window_info=False): if duration == 'inf': duration = float('inf') - user_pipeline = pcoll.pipeline + user_pipeline = ie.current_env().user_pipeline(pcoll.pipeline) recording_manager = ie.current_env().get_recording_manager( user_pipeline, create_if_absent=True) + # If already computed, directly read the stream and return. + if pcoll in ie.current_env().computed_pcollections: + pcoll_name = find_pcoll_name(pcoll) + elements = list( + recording_manager.read(pcoll_name, pcoll, n, duration).read()) + return elements_to_df( + elements, + include_window_info=include_window_info, + element_type=element_type) + recording = recording_manager.record([pcoll], max_n=n, max_duration=duration) try: diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py index f12ab8e..b093aa2 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py @@ -120,7 +120,9 @@ class InteractiveBeamTest(unittest.TestCase): ib.show(pcoll) self.assertTrue(pcoll in ie.current_env().computed_pcollections) - @patch('apache_beam.runners.interactive.interactive_beam.visualize') + @patch(( + 'apache_beam.runners.interactive.interactive_beam.' + 'visualize_computed_pcoll')) def test_show_handles_dict_of_pcolls(self, mocked_visualize): p = beam.Pipeline(ir.InteractiveRunner()) # pylint: disable=range-builtin-not-iterating @@ -133,7 +135,9 @@ class InteractiveBeamTest(unittest.TestCase): ib.show({'pcoll': pcoll}) mocked_visualize.assert_called_once() - @patch('apache_beam.runners.interactive.interactive_beam.visualize') + @patch(( + 'apache_beam.runners.interactive.interactive_beam.' + 'visualize_computed_pcoll')) def test_show_handles_iterable_of_pcolls(self, mocked_visualize): p = beam.Pipeline(ir.InteractiveRunner()) # pylint: disable=range-builtin-not-iterating @@ -159,7 +163,9 @@ class InteractiveBeamTest(unittest.TestCase): ib.show(deferred) mocked_visualize.assert_called_once() - @patch('apache_beam.runners.interactive.interactive_beam.visualize') + @patch(( + 'apache_beam.runners.interactive.interactive_beam.' + 'visualize_computed_pcoll')) def test_show_noop_when_pcoll_container_is_invalid(self, mocked_visualize): class SomeRandomClass: def __init__(self, pcoll): diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py index 6c0f8d3..bd40f13 100644 --- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py +++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py @@ -20,10 +20,12 @@ Only works within an IPython kernel. """ +import argparse import importlib import keyword import logging from typing import Dict +from typing import List from typing import Optional from typing import Tuple from typing import Union @@ -49,23 +51,22 @@ from apache_beam.testing import test_stream from apache_beam.testing.test_stream_service import TestStreamServiceController from apache_beam.transforms.sql import SqlTransform from IPython.core.magic import Magics -from IPython.core.magic import cell_magic +from IPython.core.magic import line_cell_magic from IPython.core.magic import magics_class _LOGGER = logging.getLogger(__name__) -_EXAMPLE_USAGE = """Usage: - %%%%beam_sql [output_name] - Calcite SQL statement - Syntax: https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/ - Please make sure that there is no conflicts between your variable names and - the SQL keywords, such as "SELECT", "FROM", "WHERE" and etc. - - output_name is optional. If not supplied, a variable name is automatically - assigned to the output of the magic. - - The output of the magic is usually a PCollection or similar PValue, - depending on the SQL statement executed. +_EXAMPLE_USAGE = """beam_sql magic to execute Beam SQL in notebooks +--------------------------------------------------------- +%%beam_sql [-o OUTPUT_NAME] query +--------------------------------------------------------- +Or +--------------------------------------------------------- +%%%%beam_sql [-o OUTPUT_NAME] query-line#1 +query-line#2 +... +query-line#N +--------------------------------------------------------- """ _NOT_SUPPORTED_MSG = """The query was valid and successfully applied. @@ -82,38 +83,107 @@ _NOT_SUPPORTED_MSG = """The query was valid and successfully applied. """ +class BeamSqlParser: + """A parser to parse beam_sql inputs.""" + def __init__(self): + self._parser = argparse.ArgumentParser(usage=_EXAMPLE_USAGE) + self._parser.add_argument( + '-o', + '--output-name', + dest='output_name', + help=( + 'The output variable name of the magic, usually a PCollection. ' + 'Auto-generated if omitted.')) + self._parser.add_argument( + '-v', + '--verbose', + action='store_true', + help='Display more details about the magic execution.') + self._parser.add_argument( + 'query', + type=str, + nargs='*', + help=( + 'The Beam SQL query to execute. ' + 'Syntax: https://beam.apache.org/documentation/dsls/sql/calcite/' + 'query-syntax/. ' + 'Please make sure that there is no conflict between your variable ' + 'names and the SQL keywords, such as "SELECT", "FROM", "WHERE" and ' + 'etc.')) + + def parse(self, args: List[str]) -> Optional[argparse.Namespace]: + """Parses a list of string inputs. + + The parsed namespace contains these attributes: + output_name: Optional[str], the output variable name. + verbose: bool, whether to display more details of the magic execution. + query: Optional[List[str]], the beam SQL query to execute. + + Returns: + The parsed args or None if fail to parse. + """ + try: + return self._parser.parse_args(args) + except KeyboardInterrupt: + raise + except: # pylint: disable=bare-except + # -h or --help results in SystemExit 0. Do not raise. + return None + + def print_help(self) -> None: + self._parser.print_help() + + def on_error(error_msg, *args): """Logs the error and the usage example.""" _LOGGER.error(error_msg, *args) - _LOGGER.info(_EXAMPLE_USAGE) + BeamSqlParser().print_help() @magics_class class BeamSqlMagics(Magics): - @cell_magic - def beam_sql(self, line: str, cell: str) -> Union[None, PValue]: - """The beam_sql cell magic that executes a Beam SQL. + def __init__(self, shell): + super().__init__(shell) + # Eagerly initializes the environment. + _ = ie.current_env() + self._parser = BeamSqlParser() + + @line_cell_magic + def beam_sql(self, line: str, cell: Optional[str] = None) -> Optional[PValue]: + """The beam_sql line/cell magic that executes a Beam SQL. Args: - line: (optional) the string on the same line after the beam_sql magic. - Used as the output variable name in the __main__ module. - cell: everything else in the same notebook cell as a string. Used as a - Beam SQL query. + line: the string on the same line after the beam_sql magic. + cell: everything else in the same notebook cell as a string. If None, + beam_sql is used as line magic. Otherwise, cell magic. Returns None if running into an error, otherwise a PValue as if a SqlTransform is applied. """ - if line and not line.strip().isidentifier() or keyword.iskeyword( - line.strip()): + input_str = line + if cell: + input_str += ' ' + cell + parsed = self._parser.parse(input_str.strip().split()) + if not parsed: + # Failed to parse inputs, let the parser handle the exit. + return + output_name = parsed.output_name + verbose = parsed.verbose + query = parsed.query + + if output_name and not output_name.isidentifier() or keyword.iskeyword( + output_name): on_error( 'The output_name "%s" is not a valid identifier. Please supply a ' 'valid identifier that is not a Python keyword.', line) return - if not cell or cell.isspace(): - on_error('Please supply the sql to be executed.') + if not query: + on_error('Please supply the SQL query to be executed.') return - found = find_pcolls(cell, pcoll_by_name()) + query = ' '.join(query) + + found = find_pcolls(query, pcoll_by_name(), verbose=verbose) for _, pcoll in found.items(): if not is_namedtuple(pcoll.element_type): on_error( @@ -123,9 +193,9 @@ class BeamSqlMagics(Magics): pcoll, pcoll.element_type) return - register_coder_for_schema(pcoll.element_type) + register_coder_for_schema(pcoll.element_type, verbose=verbose) - output_name, output = apply_sql(cell, line, found) + output_name, output = apply_sql(query, output_name, found) cache_output(output_name, output) return output @@ -249,7 +319,8 @@ def _build_query_components( PCollection, or the pipeline to execute the query. """ if found: - user_pipeline = next(iter(found.values())).pipeline + user_pipeline = ie.current_env().user_pipeline( + next(iter(found.values())).pipeline) sql_pipeline = beam.Pipeline(options=user_pipeline._options) ie.current_env().add_derived_pipeline(user_pipeline, sql_pipeline) sql_source = {} @@ -295,7 +366,8 @@ def cache_output(output_name: str, output: PValue) -> None: _LOGGER.warning(_NOT_SUPPORTED_MSG, e, output.pipeline.runner) return ie.current_env().mark_pcollection_computed([output]) - visualize_computed_pcoll(output_name, output) + visualize_computed_pcoll( + output_name, output, max_n=float('inf'), max_duration_secs=float('inf')) def load_ipython_extension(ipython): diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py index 7c4de77..538abbb 100644 --- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py +++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py @@ -34,6 +34,7 @@ try: from apache_beam.runners.interactive.sql.beam_sql_magics import _build_query_components from apache_beam.runners.interactive.sql.beam_sql_magics import _generate_output_name from apache_beam.runners.interactive.sql.beam_sql_magics import cache_output + from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlParser except (ImportError, NameError): pass # The test is to be skipped because [interactive] dep not installed. @@ -133,6 +134,19 @@ class BeamSqlMagicsTest(unittest.TestCase): cache_manager.exists( 'full', CacheKey.from_pcoll('pcoll_co', pcoll_co).to_str())) + def test_parser_with_all_inputs(self): + parsed = BeamSqlParser().parse( + '-o output_name -v SELECT * FROM abc'.split()) + self.assertTrue(parsed.verbose) + self.assertEqual('output_name', parsed.output_name) + self.assertEqual('SELECT * FROM abc', ' '.join(parsed.query)) + + def test_parser_with_no_input(self): + parsed = BeamSqlParser().parse([]) + self.assertFalse(parsed.verbose) + self.assertIsNone(parsed.output_name) + self.assertFalse(parsed.query) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/interactive/sql/utils.py b/sdks/python/apache_beam/runners/interactive/sql/utils.py index 1840e60..fb4e57d 100644 --- a/sdks/python/apache_beam/runners/interactive/sql/utils.py +++ b/sdks/python/apache_beam/runners/interactive/sql/utils.py @@ -39,7 +39,8 @@ def is_namedtuple(cls: type) -> bool: hasattr(cls, '_fields') and hasattr(cls, '__annotations__')) -def register_coder_for_schema(schema: NamedTuple) -> None: +def register_coder_for_schema( + schema: NamedTuple, verbose: bool = False) -> None: """Registers a RowCoder for the given schema if hasn't. Notifies the user of what code has been implicitly executed. @@ -48,19 +49,21 @@ def register_coder_for_schema(schema: NamedTuple) -> None: 'Schema %s is not a typing.NamedTuple.' % schema) coder = beam.coders.registry.get_coder(schema) if not isinstance(coder, beam.coders.RowCoder): - _LOGGER.warning( - 'Schema %s has not been registered to use a RowCoder. ' - 'Automatically registering it by running: ' - 'beam.coders.registry.register_coder(%s, ' - 'beam.coders.RowCoder)', - schema.__name__, - schema.__name__) + if verbose: + _LOGGER.warning( + 'Schema %s has not been registered to use a RowCoder. ' + 'Automatically registering it by running: ' + 'beam.coders.registry.register_coder(%s, ' + 'beam.coders.RowCoder)', + schema.__name__, + schema.__name__) beam.coders.registry.register_coder(schema, beam.coders.RowCoder) def find_pcolls( - sql: str, pcolls: Dict[str, - beam.PCollection]) -> Dict[str, beam.PCollection]: + sql: str, + pcolls: Dict[str, beam.PCollection], + verbose: bool = False) -> Dict[str, beam.PCollection]: """Finds all PCollections used in the given sql query. It does a simple word by word match and calls ib.collect for each PCollection @@ -71,8 +74,9 @@ def find_pcolls( if word in pcolls: found[word] = pcolls[word] if found: - _LOGGER.info('Found PCollections used in the magic: %s.', found) - _LOGGER.info('Collecting data...') + if verbose: + _LOGGER.info('Found PCollections used in the magic: %s.', found) + _LOGGER.info('Collecting data...') for name, pcoll in found.items(): try: _ = ib.collect(pcoll) diff --git a/sdks/python/apache_beam/runners/interactive/utils.py b/sdks/python/apache_beam/runners/interactive/utils.py index 2c75cc9..49b87ba 100644 --- a/sdks/python/apache_beam/runners/interactive/utils.py +++ b/sdks/python/apache_beam/runners/interactive/utils.py @@ -313,6 +313,20 @@ def pcoll_by_name() -> Dict[str, beam.PCollection]: return pcolls +def find_pcoll_name(pcoll: beam.PCollection) -> str: + """Finds the variable name of a PCollection defined by the user. + + Returns None if not assigned to any variable. + """ + from apache_beam.runners.interactive import interactive_environment as ie + + inspectables = ie.current_env().inspector.inspectables + for _, inspectable in inspectables.items(): + if inspectable['value'] is pcoll: + return inspectable['metadata']['name'] + return None + + def cacheables() -> Dict[CacheKey, Cacheable]: """Finds all Cacheables with their CacheKeys.""" from apache_beam.runners.interactive import interactive_environment as ie diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py b/sdks/python/apache_beam/runners/interactive/utils_test.py index 5929c8e2..784081e 100644 --- a/sdks/python/apache_beam/runners/interactive/utils_test.py +++ b/sdks/python/apache_beam/runners/interactive/utils_test.py @@ -309,6 +309,15 @@ class GeneralUtilTest(unittest.TestCase): _ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name) self.assertFalse(utils.has_unbounded_sources(p)) + def test_find_pcoll_name(self): + p = beam.Pipeline() + pcoll = p | beam.Create([1, 2, 3]) + ib.watch({ + 'p_test_find_pcoll_name': p, + 'pcoll_test_find_pcoll_name': pcoll, + }) + self.assertEqual('pcoll_test_find_pcoll_name', utils.find_pcoll_name(pcoll)) + if __name__ == '__main__': unittest.main()