This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a6bb4ae  [BEAM-10708] Added an example notebook for beam_sql magic
     new 919fdef  Merge pull request #15518 from KevinGG/beam_sql_out_cache
a6bb4ae is described below

commit a6bb4ae1a3889759ec92d2a21bc52bd2b026cbec
Author: KevinGG <kawai...@gmail.com>
AuthorDate: Wed Sep 15 14:18:11 2021 -0700

    [BEAM-10708] Added an example notebook for beam_sql magic
    
    1. Supported show/collect for beam_sql outputs.
    2. Added a parser for beam_sql inputs.
---
 .../interactive/display/pcoll_visualization.py     |  32 +-
 .../Run Beam SQL with beam_sql magic.ipynb         | 549 +++++++++++++++++++++
 .../runners/interactive/interactive_beam.py        |  35 +-
 .../runners/interactive/interactive_beam_test.py   |  12 +-
 .../runners/interactive/sql/beam_sql_magics.py     | 132 +++--
 .../interactive/sql/beam_sql_magics_test.py        |  14 +
 .../apache_beam/runners/interactive/sql/utils.py   |  28 +-
 .../apache_beam/runners/interactive/utils.py       |  14 +
 .../apache_beam/runners/interactive/utils_test.py  |   9 +
 9 files changed, 771 insertions(+), 54 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py 
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 9e071fe..f317978 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -27,6 +27,7 @@ import datetime
 import html
 import logging
 from datetime import timedelta
+from typing import Optional
 
 from dateutil import tz
 
@@ -238,19 +239,42 @@ def visualize(
 
 
 def visualize_computed_pcoll(
-    pcoll_name: str, pcoll: beam.pvalue.PCollection) -> None:
+    pcoll_name: str,
+    pcoll: beam.pvalue.PCollection,
+    max_n: int,
+    max_duration_secs: float,
+    dynamic_plotting_interval: Optional[int] = None,
+    include_window_info: bool = False,
+    display_facets: bool = False) -> None:
   """A simple visualize alternative.
 
   When the pcoll_name and pcoll pair identifies a watched and computed
   PCollection in the current interactive environment without ambiguity, an
-  ElementStream can be built directly from cache.
+  ElementStream can be built directly from cache. Returns immediately, the
+  visualization is asynchronous, but guaranteed to end in the near future.
+
+  Args:
+    pcoll_name: the variable name of the PCollection.
+    pcoll: the PCollection to be visualized.
+    max_n: the maximum number of elements to visualize.
+    max_duration_secs: max duration of elements to read in seconds.
+    dynamic_plotting_interval: the interval in seconds between visualization
+      updates if provided; otherwise, no dynamic plotting.
+    include_window_info: whether to include windowing info in the elements.
+    display_facets: whether to display the facets widgets.
   """
   pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
   rm = ie.current_env().get_recording_manager(pipeline, create_if_absent=True)
+
   stream = rm.read(
-      pcoll_name, pcoll, max_n=float('inf'), max_duration_secs=float('inf'))
+      pcoll_name, pcoll, max_n=max_n, max_duration_secs=max_duration_secs)
   if stream:
-    visualize(stream, element_type=pcoll.element_type)
+    visualize(
+        stream,
+        dynamic_plotting_interval=dynamic_plotting_interval,
+        include_window_info=include_window_info,
+        display_facets=display_facets,
+        element_type=pcoll.element_type)
 
 
 class PCollectionVisualization(object):
diff --git a/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL 
with beam_sql magic.ipynb 
b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with 
beam_sql magic.ipynb
new file mode 100644
index 0000000..d68fa8c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/interactive/examples/Run Beam SQL with 
beam_sql magic.ipynb       
@@ -0,0 +1,549 @@
+{
+ "cells": [
+  {
+   "cell_type": "markdown",
+   "id": "83acd0be",
+   "metadata": {},
+   "source": [
+    "Licensed under the Apache License, Version 2.0 (the \"License\");\n",
+    "<!--\n",
+    "    Licensed to the Apache Software Foundation (ASF) under one\n",
+    "    or more contributor license agreements.  See the NOTICE file\n",
+    "    distributed with this work for additional information\n",
+    "    regarding copyright ownership.  The ASF licenses this file\n",
+    "    to you under the Apache License, Version 2.0 (the\n",
+    "    \"License\"); you may not use this file except in compliance\n",
+    "    with the License.  You may obtain a copy of the License at\n",
+    "\n",
+    "      http://www.apache.org/licenses/LICENSE-2.0\n";,
+    "\n",
+    "    Unless required by applicable law or agreed to in writing,\n",
+    "    software distributed under the License is distributed on an\n",
+    "    \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+    "    KIND, either express or implied.  See the License for the\n",
+    "    specific language governing permissions and limitations\n",
+    "    under the License.\n",
+    "-->\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "5022179a",
+   "metadata": {},
+   "source": [
+    "# Run Beam SQL in notebooks\n",
+    "\n",
+    "[Beam SQL](https://beam.apache.org/documentation/dsls/sql/overview/) 
allows a Beam user to query PCollections with SQL statements. Currently, 
`InteractiveRunner` does not support `SqlTransform` due to 
[BEAM-10708](https://issues.apache.org/jira/browse/BEAM-10708). However, a user 
could use the `beam_sql` magic to run Beam SQL in the notebook and introspect 
the result.\n",
+    "\n",
+    "`beam_sql` is an IPython [custom 
magic](https://ipython.readthedocs.io/en/stable/config/custommagics.html). If 
you're not familiar with magics, here are some [built-in 
examples](https://ipython.readthedocs.io/en/stable/interactive/magics.html). 
It's a convenient way to validate your queries locally against known/test data 
sources when prototyping a Beam pipeline with SQL, before productionizing it on 
remote cluster/services.\n",
+    "\n",
+    "First, let's load the `beam_sql` magic:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "c6b6e3c1",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%load_ext apache_beam.runners.interactive.sql.beam_sql_magics"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "a7c43b84",
+   "metadata": {},
+   "source": [
+    "Since SQL support in Beam Python SDK is implemented through xLang 
external transform, make sure you have below prerequisites:\n",
+    "- Have `docker` installed;\n",
+    "- Have jdk8 or jdk11 installed and $JAVA_HOME set;"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "b280710a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "!docker image list\n",
+    "!java --version\n",
+    "!echo $JAVA_HOME"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "28b1b320",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Optionally sets the logging level to reduce distraction.\n",
+    "import logging\n",
+    "\n",
+    "logging.root.setLevel(logging.ERROR)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "f6b8789f",
+   "metadata": {},
+   "source": [
+    "**Important**: if you're using Beam built from your local source code, 
additionally:\n",
+    "\n",
+    "- Have the Java expansion service shadowjar built. Go to the root 
directory of your local beam repo and then execute:\n",
+    "  `./gradlew :sdks:java:extensions:sql:expansion-service:shadowJar`.\n",
+    "- Based on your jdk version, pull the docker image `docker pull 
apache/beam_java11_sdk` or `docker pull apache/beam_java8_sdk`.\n",
+    "- Then tag the image with your current Beam dev version.  You can check 
the dev version under `apache_beam.version.__version__`. For example, if you're 
using jdk11 and dev version is `x.x.x.dev`, execute `docker image tag 
apache/beam_java11_sdk:latest apache/beam_java11_sdk:x.x.x.dev`."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "14c8967d",
+   "metadata": {},
+   "source": [
+    "## Query#1 - A simple static query\n",
+    "\n",
+    "The `beam_sql` magic can be used as either a line magic or a cell 
magic.\n",
+    "You can check its usage by running:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "c212dd89",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%beam_sql -h"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "7914c1aa",
+   "metadata": {},
+   "source": [
+    "You can run a simple SQL query (in Apache Calcite SQL 
[syntax](https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/)) 
to create a [schema-aware 
PCollection](https://beam.apache.org/documentation/programming-guide/#schemas) 
from static values."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "895341fa",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql -o query1_data\n",
+    "SELECT CAST(5 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 
AS DOUBLE) AS `flt`"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "c394ead5",
+   "metadata": {},
+   "source": [
+    "The `beam_sql` magic shows you the result of the SQL query.\n",
+    "\n",
+    "It also creates and outputs a PCollection named `query1_data` with 
`element_type` like `BeamSchema_...(id: int32, str: str)`.\n",
+    "\n",
+    "Note that you have **not** explicitly created a Beam pipeline. You get a 
PCollection because the `beam_sql` magic always **implicitly creates** a 
pipeline to execute your SQL query. To hold the elements with each field's type 
info, Beam automatically creates a schema as the `element_type` for the created 
PCollection."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "981b2cc9",
+   "metadata": {},
+   "source": [
+    "To introspect the data again with more knobs, you can use `show`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "e97caf83",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from apache_beam.runners.interactive import interactive_beam as ib\n",
+    "ib.show(query1_data)\n",
+    "# Uncomment below to set more args.\n",
+    "# ib.show(query1_data, visualize_data=True, include_window_info=True)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "f58b15a8",
+   "metadata": {},
+   "source": [
+    "To materialize the PCollection into a pandas 
[DataFrame](https://pandas.pydata.org/pandas-docs/stable/user_guide/dsintro.html#dataframe)
 object, you can use `collect`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "47b8da1a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "ib.collect(query1_data)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "09b4f24c",
+   "metadata": {},
+   "source": [
+    "You can also additionally append some transforms such as writing to a 
text file and print the elements:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "9a650bbb",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import apache_beam as beam\n",
+    "\n",
+    "coder=beam.coders.registry.get_coder(query1_data.element_type)\n",
+    "print(coder)\n",
+    "query1_data | beam.io.textio.WriteToText('/tmp/query1_data', 
coder=coder)\n",
+    "query1_data | beam.Map(print)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "6cf89704",
+   "metadata": {},
+   "source": [
+    "Execute the pipeline as a normal pipeline running on DirectRunner and 
inspect the output file."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "d524e1a0",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "!rm -rf /tmp/query1_data*\n",
+    "query1_data.pipeline.run().wait_until_finish()\n",
+    "!ls /tmp/query1_data*\n",
+    "!cat /tmp/query1_data*"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "5600945a",
+   "metadata": {},
+   "source": [
+    "The coder in use is a `RowCoder`. The element is encoded and written to 
the text file. When inspecting it directly, it may display garbled strings. The 
file will be revisited later in Query#4."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "30aa1188",
+   "metadata": {},
+   "source": [
+    "### [Optional] Omit the `-o` option.\n",
+    "If the option is omitted, an output name is auto-generated based on the 
SQL query and PCollection (if any) it queries. Optionally, you can also use the 
`_[{execution_count}]` convention: `_` for last output and `_{execution_count}` 
for a specific cell execution output.\n",
+    "\n",
+    "However, explicitly naming the output is recommended for better notebook 
readability and to avoid unexpected errors.\n",
+    "\n",
+    "Below example outputs a PCollection named like `sql_output_...`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "b445e4f1",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql\n",
+    "SELECT CAST(1 AS INT) AS `id`, CAST('foo' AS VARCHAR) AS `str`, CAST(3.14 
AS DOUBLE) AS `flt`"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "c7b9e4fb",
+   "metadata": {},
+   "source": [
+    "Now that you are familiar with the `beam_sql` magic, you can build more 
queries against PCollections.\n",
+    "\n",
+    "Let's install the `names` package to randomly generate some names."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "ef1ca0fc",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%pip install names"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "1c0d5739",
+   "metadata": {},
+   "source": [
+    "Import all modules needed for this example."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "20cdf3b9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import names\n",
+    "import typing\n",
+    "\n",
+    "import apache_beam as beam\n",
+    "from apache_beam.runners.interactive.interactive_runner import 
InteractiveRunner\n",
+    "from apache_beam.runners.interactive import interactive_beam as ib"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "00db1574",
+   "metadata": {},
+   "source": [
+    "Create a pipeline `p` with the `InteractiveRunner`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "24caeb60",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "p = beam.Pipeline(InteractiveRunner())"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "0a4ca6eb",
+   "metadata": {},
+   "source": [
+    "Then let's create a schema with `typing.NamedTuple`. Let's call it 
`Person` with a field `id` and a field `name`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "23910a9d",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "class Person(typing.NamedTuple):\n",
+    "    id: int\n",
+    "    name: str"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "5c626d63",
+   "metadata": {},
+   "source": [
+    "With `beam_sql` magic, you can utilize all the Beam I/O connectors 
(streaming is currently not supported due to `DirectRunner` not supporting 
streaming pipeline with `SqlTransform`) as source of data, then build a SQL 
query against all the data and check the output. If needed, you can sink the 
output following the `WriteToText` example demonstrated above."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "2d892920",
+   "metadata": {},
+   "source": [
+    "## Query#2 - Querying a single PCollection\n",
+    "\n",
+    "Let's build a PCollection with 10 random `Person` typed elements."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "8a5fc9b9",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "persons = (p \n",
+    "           | beam.Create([Person(id=x, name=names.get_full_name()) for x 
in range(10)]))\n",
+    "ib.show(persons)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "84d64746",
+   "metadata": {},
+   "source": [
+    "You can look for all elements with `id < 5` in `persons` with the below 
query and assign the output to `persons_id_lt_5`. Also, you can enable `-v` 
option to see more details about the execution."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "07db1116",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql -o persons_id_lt_5 -v\n",
+    "SELECT * FROM persons WHERE id <5"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "68afa962",
+   "metadata": {},
+   "source": [
+    "With `-v`, if it's the first time running this query, you might see a 
warning message about\n",
+    "\n",
+    "```\n",
+    "Schema Person has not been registered to use a RowCoder. Automatically 
registering it by running: beam.coders.registry.register_coder(Person, 
beam.coders.RowCoder)\n",
+    "```\n",
+    "\n",
+    "The `beam_sql` magic helps registering a `RowCoder` for each schema you 
define and use whenever it finds one. You can also explicitly run the same code 
to do so.\n",
+    "\n",
+    "Note the output element type is `Person(id: int, name: str)` instead of 
`BeamSchema_...` because you have selected all the fields from a single 
PCollection of the known type `Person(id: int, name: str)`."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "79587515",
+   "metadata": {},
+   "source": [
+    "## Query#3 - Joining multiple PCollections\n",
+    "\n",
+    "You can build a `persons_2` PCollection with a different range of `id`s 
and `name`s. "
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "c01fa39a",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "persons_2 = (p \n",
+    "             | beam.Create([Person(id=x, name=names.get_full_name()) for 
x in range(5, 15)]))\n",
+    "ib.show(persons_2)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "6904ff8e",
+   "metadata": {},
+   "source": [
+    "Then query for all `name`s from `persons` and `persons_2` with the same 
`id`s and assign the output to `persons_with_common_id`."
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "2a0a60ff",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql -o persons_with_common_id -v\n",
+    "SELECT * FROM persons JOIN persons_2 USING (id)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "4bb4df8a",
+   "metadata": {},
+   "source": [
+    "Note the output element type is now some `BeamSchema_...(id: int64, name: 
str, name0: str)`. Because you have selected columns from both PCollections, 
there is no known schema to hold the result. Beam automatically creates a 
schema and differentiates conflicted field `name` by suffixing `0` to one of 
them.\n",
+    "\n",
+    "And since `Person` is already previously registered with a `RowCoder`, 
there is no more warning about registering it anymore even with `-v`."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "cfcfeb76",
+   "metadata": {},
+   "source": [
+    "## Query#4 - Join multiple PCollections, including I/O."
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "ce8abc3d",
+   "metadata": {},
+   "source": [
+    "Let's read the file written by Query#1 and use it to join `persons` and 
`persons_2` to find `name`s with the common `id` in all three of them. "
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "d1dea37b",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Use the exact same coder used when WriteToText and explicitly set the 
output types.\n",
+    "query1_result_in_file = p | beam.io.ReadFromText(\n",
+    "    '/tmp/query1_data*', coder=coder).with_output_types(\n",
+    "    query1_data.element_type)\n",
+    "\n",
+    "# Check all the data sources.\n",
+    "ib.show(query1_result_in_file)\n",
+    "ib.show(persons)\n",
+    "ib.show(persons_2)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "4bf6c422",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql -o entry_with_common_id\n",
+    "\n",
+    "SELECT query1_result_in_file.id, persons.name AS `name_1`, persons_2.name 
AS `name_2`\n",
+    "FROM query1_result_in_file JOIN persons ON query1_result_in_file.id = 
persons.id\n",
+    "JOIN persons_2 ON query1_result_in_file.id = persons_2.id"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "id": "282f6173",
+   "metadata": {},
+   "source": [
+    "You can also chain another `beam_sql` magic to get just `name_1`:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "d858dd6c",
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "%%beam_sql -o name_found\n",
+    "SELECT name_1 AS `name` FROM entry_with_common_id"
+   ]
+  }
+ ],
+ "metadata": {
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.7.11"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_beam.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
index 0b3f8a3..fc55bf1 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam.py
@@ -41,9 +41,11 @@ from apache_beam.dataframe.frame_base import DeferredBase
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.display.pcoll_visualization import 
visualize
+from apache_beam.runners.interactive.display.pcoll_visualization import 
visualize_computed_pcoll
 from apache_beam.runners.interactive.options import interactive_options
 from apache_beam.runners.interactive.utils import deferred_df_to_pcollection
 from apache_beam.runners.interactive.utils import elements_to_df
+from apache_beam.runners.interactive.utils import find_pcoll_name
 from apache_beam.runners.interactive.utils import progress_indicated
 from apache_beam.runners.runner import PipelineState
 
@@ -448,7 +450,7 @@ def show(
 
   # Iterate through the given PCollections and convert any deferred DataFrames
   # or Series into PCollections.
-  pcolls = []
+  pcolls = set()
 
   # The element type is used to help visualize the given PCollection. For the
   # deferred DataFrame/Series case it is the proxy of the frame.
@@ -462,14 +464,14 @@ def show(
 
     element_types[pcoll] = element_type
 
-    pcolls.append(pcoll)
+    pcolls.add(pcoll)
     assert isinstance(pcoll, beam.pvalue.PCollection), (
         '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
 
   assert len(pcolls) > 0, (
       'Need at least 1 PCollection to show data visualization.')
 
-  user_pipeline = pcolls[0].pipeline
+  user_pipeline = ie.current_env().user_pipeline(next(iter(pcolls)).pipeline)
 
   if isinstance(n, str):
     assert n == 'inf', (
@@ -488,6 +490,20 @@ def show(
   if duration == 'inf':
     duration = float('inf')
 
+  previously_computed_pcolls = {
+      pcoll
+      for pcoll in pcolls if pcoll in ie.current_env().computed_pcollections
+  }
+  for pcoll in previously_computed_pcolls:
+    visualize_computed_pcoll(
+        find_pcoll_name(pcoll),
+        pcoll,
+        n,
+        duration,
+        include_window_info=include_window_info,
+        display_facets=visualize_data)
+  pcolls = pcolls - previously_computed_pcolls
+
   recording_manager = ie.current_env().get_recording_manager(
       user_pipeline, create_if_absent=True)
   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
@@ -509,7 +525,6 @@ def show(
             stream,
             include_window_info=include_window_info,
             element_type=element_types[stream.pcoll])
-
     if recording.is_computed():
       return
 
@@ -591,10 +606,20 @@ def collect(pcoll, n='inf', duration='inf', 
include_window_info=False):
   if duration == 'inf':
     duration = float('inf')
 
-  user_pipeline = pcoll.pipeline
+  user_pipeline = ie.current_env().user_pipeline(pcoll.pipeline)
   recording_manager = ie.current_env().get_recording_manager(
       user_pipeline, create_if_absent=True)
 
+  # If already computed, directly read the stream and return.
+  if pcoll in ie.current_env().computed_pcollections:
+    pcoll_name = find_pcoll_name(pcoll)
+    elements = list(
+        recording_manager.read(pcoll_name, pcoll, n, duration).read())
+    return elements_to_df(
+        elements,
+        include_window_info=include_window_info,
+        element_type=element_type)
+
   recording = recording_manager.record([pcoll], max_n=n, max_duration=duration)
 
   try:
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
index f12ab8e..b093aa2 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_beam_test.py
@@ -120,7 +120,9 @@ class InteractiveBeamTest(unittest.TestCase):
     ib.show(pcoll)
     self.assertTrue(pcoll in ie.current_env().computed_pcollections)
 
-  @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+  @patch((
+      'apache_beam.runners.interactive.interactive_beam.'
+      'visualize_computed_pcoll'))
   def test_show_handles_dict_of_pcolls(self, mocked_visualize):
     p = beam.Pipeline(ir.InteractiveRunner())
     # pylint: disable=range-builtin-not-iterating
@@ -133,7 +135,9 @@ class InteractiveBeamTest(unittest.TestCase):
     ib.show({'pcoll': pcoll})
     mocked_visualize.assert_called_once()
 
-  @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+  @patch((
+      'apache_beam.runners.interactive.interactive_beam.'
+      'visualize_computed_pcoll'))
   def test_show_handles_iterable_of_pcolls(self, mocked_visualize):
     p = beam.Pipeline(ir.InteractiveRunner())
     # pylint: disable=range-builtin-not-iterating
@@ -159,7 +163,9 @@ class InteractiveBeamTest(unittest.TestCase):
     ib.show(deferred)
     mocked_visualize.assert_called_once()
 
-  @patch('apache_beam.runners.interactive.interactive_beam.visualize')
+  @patch((
+      'apache_beam.runners.interactive.interactive_beam.'
+      'visualize_computed_pcoll'))
   def test_show_noop_when_pcoll_container_is_invalid(self, mocked_visualize):
     class SomeRandomClass:
       def __init__(self, pcoll):
diff --git a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py 
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
index 6c0f8d3..bd40f13 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics.py
@@ -20,10 +20,12 @@
 Only works within an IPython kernel.
 """
 
+import argparse
 import importlib
 import keyword
 import logging
 from typing import Dict
+from typing import List
 from typing import Optional
 from typing import Tuple
 from typing import Union
@@ -49,23 +51,22 @@ from apache_beam.testing import test_stream
 from apache_beam.testing.test_stream_service import TestStreamServiceController
 from apache_beam.transforms.sql import SqlTransform
 from IPython.core.magic import Magics
-from IPython.core.magic import cell_magic
+from IPython.core.magic import line_cell_magic
 from IPython.core.magic import magics_class
 
 _LOGGER = logging.getLogger(__name__)
 
-_EXAMPLE_USAGE = """Usage:
-    %%%%beam_sql [output_name]
-    Calcite SQL statement
-    Syntax: 
https://beam.apache.org/documentation/dsls/sql/calcite/query-syntax/
-    Please make sure that there is no conflicts between your variable names and
-    the SQL keywords, such as "SELECT", "FROM", "WHERE" and etc.
-
-    output_name is optional. If not supplied, a variable name is automatically
-    assigned to the output of the magic.
-
-    The output of the magic is usually a PCollection or similar PValue,
-    depending on the SQL statement executed.
+_EXAMPLE_USAGE = """beam_sql magic to execute Beam SQL in notebooks
+---------------------------------------------------------
+%%beam_sql [-o OUTPUT_NAME] query
+---------------------------------------------------------
+Or
+---------------------------------------------------------
+%%%%beam_sql [-o OUTPUT_NAME] query-line#1
+query-line#2
+...
+query-line#N
+---------------------------------------------------------
 """
 
 _NOT_SUPPORTED_MSG = """The query was valid and successfully applied.
@@ -82,38 +83,107 @@ _NOT_SUPPORTED_MSG = """The query was valid and 
successfully applied.
 """
 
 
+class BeamSqlParser:
+  """A parser to parse beam_sql inputs."""
+  def __init__(self):
+    self._parser = argparse.ArgumentParser(usage=_EXAMPLE_USAGE)
+    self._parser.add_argument(
+        '-o',
+        '--output-name',
+        dest='output_name',
+        help=(
+            'The output variable name of the magic, usually a PCollection. '
+            'Auto-generated if omitted.'))
+    self._parser.add_argument(
+        '-v',
+        '--verbose',
+        action='store_true',
+        help='Display more details about the magic execution.')
+    self._parser.add_argument(
+        'query',
+        type=str,
+        nargs='*',
+        help=(
+            'The Beam SQL query to execute. '
+            'Syntax: https://beam.apache.org/documentation/dsls/sql/calcite/'
+            'query-syntax/. '
+            'Please make sure that there is no conflict between your variable '
+            'names and the SQL keywords, such as "SELECT", "FROM", "WHERE" and 
'
+            'etc.'))
+
+  def parse(self, args: List[str]) -> Optional[argparse.Namespace]:
+    """Parses a list of string inputs.
+
+    The parsed namespace contains these attributes:
+      output_name: Optional[str], the output variable name.
+      verbose: bool, whether to display more details of the magic execution.
+      query: Optional[List[str]], the beam SQL query to execute.
+
+    Returns:
+      The parsed args or None if fail to parse.
+    """
+    try:
+      return self._parser.parse_args(args)
+    except KeyboardInterrupt:
+      raise
+    except:  # pylint: disable=bare-except
+      # -h or --help results in SystemExit 0. Do not raise.
+      return None
+
+  def print_help(self) -> None:
+    self._parser.print_help()
+
+
 def on_error(error_msg, *args):
   """Logs the error and the usage example."""
   _LOGGER.error(error_msg, *args)
-  _LOGGER.info(_EXAMPLE_USAGE)
+  BeamSqlParser().print_help()
 
 
 @magics_class
 class BeamSqlMagics(Magics):
-  @cell_magic
-  def beam_sql(self, line: str, cell: str) -> Union[None, PValue]:
-    """The beam_sql cell magic that executes a Beam SQL.
+  def __init__(self, shell):
+    super().__init__(shell)
+    # Eagerly initializes the environment.
+    _ = ie.current_env()
+    self._parser = BeamSqlParser()
+
+  @line_cell_magic
+  def beam_sql(self, line: str, cell: Optional[str] = None) -> 
Optional[PValue]:
+    """The beam_sql line/cell magic that executes a Beam SQL.
 
     Args:
-      line: (optional) the string on the same line after the beam_sql magic.
-          Used as the output variable name in the __main__ module.
-      cell: everything else in the same notebook cell as a string. Used as a
-          Beam SQL query.
+      line: the string on the same line after the beam_sql magic.
+      cell: everything else in the same notebook cell as a string. If None,
+        beam_sql is used as line magic. Otherwise, cell magic.
 
     Returns None if running into an error, otherwise a PValue as if a
     SqlTransform is applied.
     """
-    if line and not line.strip().isidentifier() or keyword.iskeyword(
-        line.strip()):
+    input_str = line
+    if cell:
+      input_str += ' ' + cell
+    parsed = self._parser.parse(input_str.strip().split())
+    if not parsed:
+      # Failed to parse inputs, let the parser handle the exit.
+      return
+    output_name = parsed.output_name
+    verbose = parsed.verbose
+    query = parsed.query
+
+    if output_name and not output_name.isidentifier() or keyword.iskeyword(
+        output_name):
       on_error(
           'The output_name "%s" is not a valid identifier. Please supply a '
           'valid identifier that is not a Python keyword.',
           line)
       return
-    if not cell or cell.isspace():
-      on_error('Please supply the sql to be executed.')
+    if not query:
+      on_error('Please supply the SQL query to be executed.')
       return
-    found = find_pcolls(cell, pcoll_by_name())
+    query = ' '.join(query)
+
+    found = find_pcolls(query, pcoll_by_name(), verbose=verbose)
     for _, pcoll in found.items():
       if not is_namedtuple(pcoll.element_type):
         on_error(
@@ -123,9 +193,9 @@ class BeamSqlMagics(Magics):
             pcoll,
             pcoll.element_type)
         return
-      register_coder_for_schema(pcoll.element_type)
+      register_coder_for_schema(pcoll.element_type, verbose=verbose)
 
-    output_name, output = apply_sql(cell, line, found)
+    output_name, output = apply_sql(query, output_name, found)
     cache_output(output_name, output)
     return output
 
@@ -249,7 +319,8 @@ def _build_query_components(
     PCollection, or the pipeline to execute the query.
   """
   if found:
-    user_pipeline = next(iter(found.values())).pipeline
+    user_pipeline = ie.current_env().user_pipeline(
+        next(iter(found.values())).pipeline)
     sql_pipeline = beam.Pipeline(options=user_pipeline._options)
     ie.current_env().add_derived_pipeline(user_pipeline, sql_pipeline)
     sql_source = {}
@@ -295,7 +366,8 @@ def cache_output(output_name: str, output: PValue) -> None:
     _LOGGER.warning(_NOT_SUPPORTED_MSG, e, output.pipeline.runner)
     return
   ie.current_env().mark_pcollection_computed([output])
-  visualize_computed_pcoll(output_name, output)
+  visualize_computed_pcoll(
+      output_name, output, max_n=float('inf'), max_duration_secs=float('inf'))
 
 
 def load_ipython_extension(ipython):
diff --git 
a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py 
b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
index 7c4de77..538abbb 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/beam_sql_magics_test.py
@@ -34,6 +34,7 @@ try:
   from apache_beam.runners.interactive.sql.beam_sql_magics import 
_build_query_components
   from apache_beam.runners.interactive.sql.beam_sql_magics import 
_generate_output_name
   from apache_beam.runners.interactive.sql.beam_sql_magics import cache_output
+  from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlParser
 except (ImportError, NameError):
   pass  # The test is to be skipped because [interactive] dep not installed.
 
@@ -133,6 +134,19 @@ class BeamSqlMagicsTest(unittest.TestCase):
           cache_manager.exists(
               'full', CacheKey.from_pcoll('pcoll_co', pcoll_co).to_str()))
 
+  def test_parser_with_all_inputs(self):
+    parsed = BeamSqlParser().parse(
+        '-o output_name -v SELECT * FROM abc'.split())
+    self.assertTrue(parsed.verbose)
+    self.assertEqual('output_name', parsed.output_name)
+    self.assertEqual('SELECT * FROM abc', ' '.join(parsed.query))
+
+  def test_parser_with_no_input(self):
+    parsed = BeamSqlParser().parse([])
+    self.assertFalse(parsed.verbose)
+    self.assertIsNone(parsed.output_name)
+    self.assertFalse(parsed.query)
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/sql/utils.py 
b/sdks/python/apache_beam/runners/interactive/sql/utils.py
index 1840e60..fb4e57d 100644
--- a/sdks/python/apache_beam/runners/interactive/sql/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/sql/utils.py
@@ -39,7 +39,8 @@ def is_namedtuple(cls: type) -> bool:
       hasattr(cls, '_fields') and hasattr(cls, '__annotations__'))
 
 
-def register_coder_for_schema(schema: NamedTuple) -> None:
+def register_coder_for_schema(
+    schema: NamedTuple, verbose: bool = False) -> None:
   """Registers a RowCoder for the given schema if hasn't.
 
   Notifies the user of what code has been implicitly executed.
@@ -48,19 +49,21 @@ def register_coder_for_schema(schema: NamedTuple) -> None:
       'Schema %s is not a typing.NamedTuple.' % schema)
   coder = beam.coders.registry.get_coder(schema)
   if not isinstance(coder, beam.coders.RowCoder):
-    _LOGGER.warning(
-        'Schema %s has not been registered to use a RowCoder. '
-        'Automatically registering it by running: '
-        'beam.coders.registry.register_coder(%s, '
-        'beam.coders.RowCoder)',
-        schema.__name__,
-        schema.__name__)
+    if verbose:
+      _LOGGER.warning(
+          'Schema %s has not been registered to use a RowCoder. '
+          'Automatically registering it by running: '
+          'beam.coders.registry.register_coder(%s, '
+          'beam.coders.RowCoder)',
+          schema.__name__,
+          schema.__name__)
     beam.coders.registry.register_coder(schema, beam.coders.RowCoder)
 
 
 def find_pcolls(
-    sql: str, pcolls: Dict[str,
-                           beam.PCollection]) -> Dict[str, beam.PCollection]:
+    sql: str,
+    pcolls: Dict[str, beam.PCollection],
+    verbose: bool = False) -> Dict[str, beam.PCollection]:
   """Finds all PCollections used in the given sql query.
 
   It does a simple word by word match and calls ib.collect for each PCollection
@@ -71,8 +74,9 @@ def find_pcolls(
     if word in pcolls:
       found[word] = pcolls[word]
   if found:
-    _LOGGER.info('Found PCollections used in the magic: %s.', found)
-    _LOGGER.info('Collecting data...')
+    if verbose:
+      _LOGGER.info('Found PCollections used in the magic: %s.', found)
+      _LOGGER.info('Collecting data...')
     for name, pcoll in found.items():
       try:
         _ = ib.collect(pcoll)
diff --git a/sdks/python/apache_beam/runners/interactive/utils.py 
b/sdks/python/apache_beam/runners/interactive/utils.py
index 2c75cc9..49b87ba 100644
--- a/sdks/python/apache_beam/runners/interactive/utils.py
+++ b/sdks/python/apache_beam/runners/interactive/utils.py
@@ -313,6 +313,20 @@ def pcoll_by_name() -> Dict[str, beam.PCollection]:
   return pcolls
 
 
+def find_pcoll_name(pcoll: beam.PCollection) -> str:
+  """Finds the variable name of a PCollection defined by the user.
+
+  Returns None if not assigned to any variable.
+  """
+  from apache_beam.runners.interactive import interactive_environment as ie
+
+  inspectables = ie.current_env().inspector.inspectables
+  for _, inspectable in inspectables.items():
+    if inspectable['value'] is pcoll:
+      return inspectable['metadata']['name']
+  return None
+
+
 def cacheables() -> Dict[CacheKey, Cacheable]:
   """Finds all Cacheables with their CacheKeys."""
   from apache_beam.runners.interactive import interactive_environment as ie
diff --git a/sdks/python/apache_beam/runners/interactive/utils_test.py 
b/sdks/python/apache_beam/runners/interactive/utils_test.py
index 5929c8e2..784081e 100644
--- a/sdks/python/apache_beam/runners/interactive/utils_test.py
+++ b/sdks/python/apache_beam/runners/interactive/utils_test.py
@@ -309,6 +309,15 @@ class GeneralUtilTest(unittest.TestCase):
     _ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
     self.assertFalse(utils.has_unbounded_sources(p))
 
+  def test_find_pcoll_name(self):
+    p = beam.Pipeline()
+    pcoll = p | beam.Create([1, 2, 3])
+    ib.watch({
+        'p_test_find_pcoll_name': p,
+        'pcoll_test_find_pcoll_name': pcoll,
+    })
+    self.assertEqual('pcoll_test_find_pcoll_name', 
utils.find_pcoll_name(pcoll))
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to