This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 993fe32b9d4 [SPARK-40042][PYTHON][DOCS] Make 
pyspark.sql.streaming.query examples self-contained
993fe32b9d4 is described below

commit 993fe32b9d4cd2cbb34067fc465eeedd53cd375a
Author: Qian.Sun <qian.sun2...@gmail.com>
AuthorDate: Tue Aug 16 08:41:02 2022 -0500

    [SPARK-40042][PYTHON][DOCS] Make pyspark.sql.streaming.query examples 
self-contained
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to improve the examples in `pyspark.sql.streaming.query` 
by making each example self-contained with a brief explanation and a bit more 
realistic example.
    
    ### Why are the changes needed?
    
    To make the documentation more readable and able to copy and paste directly 
in PySpark shell.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it changes the documentation.
    
    ### How was this patch tested?
    
    Manually ran each doctest.
    
    Closes #37482 from dcoliversun/SPARK-40042.
    
    Authored-by: Qian.Sun <qian.sun2...@gmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 python/pyspark/sql/streaming/query.py | 247 +++++++++++++++++++++++++++++++---
 1 file changed, 227 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/sql/streaming/query.py 
b/python/pyspark/sql/streaming/query.py
index 20c72a64de8..aaf0b677335 100644
--- a/python/pyspark/sql/streaming/query.py
+++ b/python/pyspark/sql/streaming/query.py
@@ -46,41 +46,92 @@ class StreamingQuery:
     @property  # type: ignore[misc]
     @since(2.0)
     def id(self) -> str:
-        """Returns the unique id of this query that persists across restarts 
from checkpoint data.
+        """
+        Returns the unique id of this query that persists across restarts from 
checkpoint data.
         That is, this id is generated when a query is started for the first 
time, and
         will be the same every time it is restarted from checkpoint data.
         There can only be one query with the same id active in a Spark cluster.
         Also see, `runId`.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get the unique id of this query that persists across restarts from 
checkpoint data
+
+        >>> sq.id # doctest: +ELLIPSIS
+        '...'
+
+        >>> sq.stop()
         """
         return self._jsq.id().toString()
 
     @property  # type: ignore[misc]
     @since(2.1)
     def runId(self) -> str:
-        """Returns the unique id of this query that does not persist across 
restarts. That is, every
+        """
+        Returns the unique id of this query that does not persist across 
restarts. That is, every
         query that is started (or restarted from checkpoint) will have a 
different runId.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get the unique id of this query that does not persist across restarts
+
+        >>> sq.runId # doctest: +ELLIPSIS
+        '...'
+
+        >>> sq.stop()
         """
         return self._jsq.runId().toString()
 
     @property  # type: ignore[misc]
     @since(2.0)
     def name(self) -> str:
-        """Returns the user-specified name of the query, or null if not 
specified.
+        """
+        Returns the user-specified name of the query, or null if not specified.
         This name can be specified in the 
`org.apache.spark.sql.streaming.DataStreamWriter`
         as `dataframe.writeStream.queryName("query").start()`.
         This name, if set, must be unique across all active queries.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get the user-specified name of the query, or null if not specified.
+
+        >>> sq.name
+        'this_query'
+
+        >>> sq.stop()
         """
         return self._jsq.name()
 
     @property  # type: ignore[misc]
     @since(2.0)
     def isActive(self) -> bool:
-        """Whether this streaming query is currently active or not."""
+        """
+        Whether this streaming query is currently active or not.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+        >>> sq.isActive
+        True
+
+        >>> sq.stop()
+        """
         return self._jsq.isActive()
 
     @since(2.0)
     def awaitTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
-        """Waits for the termination of `this` query, either by 
:func:`query.stop()` or by an
+        """
+        Waits for the termination of `this` query, either by 
:func:`query.stop()` or by an
         exception. If the query has terminated with an exception, then the 
exception will be thrown.
         If `timeout` is set, it returns whether the query has terminated or 
not within the
         `timeout` seconds.
@@ -90,6 +141,18 @@ class StreamingQuery:
         immediately (if the query has terminated with exception).
 
         throws :class:`StreamingQueryException`, if `this` query has 
terminated with an exception
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('query_awaitTermination').start()
+
+        Return wheter the query has terminated or not within 5 seconds
+
+        >>> sq.awaitTermination(5)
+        False
+
+        >>> sq.stop()
         """
         if timeout is not None:
             if not isinstance(timeout, (int, float)) or timeout < 0:
@@ -103,15 +166,40 @@ class StreamingQuery:
     def status(self) -> Dict[str, Any]:
         """
         Returns the current status of the query.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get the current status of the query
+
+        >>> sq.status # doctest: +ELLIPSIS
+        {'message': '...', 'isDataAvailable': ..., 'isTriggerActive': ...}
+
+        >>> sq.stop()
         """
         return json.loads(self._jsq.status().json())
 
     @property  # type: ignore[misc]
     @since(2.1)
     def recentProgress(self) -> List[Dict[str, Any]]:
-        """Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
+        """
+        Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query.
         The number of progress updates retained for each stream is configured 
by Spark session
         configuration `spark.sql.streaming.numRecentProgressUpdates`.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get an array of the most recent query progress updates for this query
+
+        >>> sq.recentProgress # doctest: +ELLIPSIS
+        [...]
+
+        >>> sq.stop()
         """
         return [json.loads(p.json()) for p in self._jsq.recentProgress()]
 
@@ -126,6 +214,16 @@ class StreamingQuery:
         Returns
         -------
         dict
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Get the most recent query progress updates for this query
+
+        >>> sq.lastProgress
+        >>> sq.stop()
         """
         lastProgress = self._jsq.lastProgress()
         if lastProgress:
@@ -134,7 +232,8 @@ class StreamingQuery:
             return None
 
     def processAllAvailable(self) -> None:
-        """Blocks until all available data in the source has been processed 
and committed to the
+        """
+        Blocks until all available data in the source has been processed and 
committed to the
         sink. This method is intended for testing.
 
         .. versionadded:: 2.0.0
@@ -145,16 +244,45 @@ class StreamingQuery:
         Additionally, this method is only guaranteed to block until data that 
has been
         synchronously appended data to a stream source prior to invocation.
         (i.e. `getOffset` must immediately reflect the addition).
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Blocks query until all available data in the source
+        has been processed and committed to the sink
+
+        >>> sq.processAllAvailable
+        <bound method StreamingQuery.processAllAvailable ...>
+
+        >>> sq.stop()
         """
         return self._jsq.processAllAvailable()
 
     @since(2.0)
     def stop(self) -> None:
-        """Stop this streaming query."""
+        """
+        Stop this streaming query.
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+        >>> sq.isActive
+        True
+
+        Stop streaming query
+
+        >>> sq.stop()
+        >>> sq.isActive
+        False
+        """
         self._jsq.stop()
 
     def explain(self, extended: bool = False) -> None:
-        """Prints the (logical and physical) plans to the console for 
debugging purpose.
+        """
+        Prints the (logical and physical) plans to the console for debugging 
purpose.
 
         .. versionadded:: 2.1.0
 
@@ -165,8 +293,17 @@ class StreamingQuery:
 
         Examples
         --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sdf.printSchema()
+        root
+          |-- timestamp: timestamp (nullable = true)
+          |-- value: long (nullable = true)
+
         >>> sq = 
sdf.writeStream.format('memory').queryName('query_explain').start()
         >>> sq.processAllAvailable() # Wait a bit to generate the runtime 
plans.
+
+        Explain the runtime plans
+
         >>> sq.explain()
         == Physical Plan ==
         ...
@@ -218,15 +355,24 @@ class StreamingQueryManager:
 
     @property
     def active(self) -> List[StreamingQuery]:
-        """Returns a list of active queries associated with this SQLContext
+        """
+        Returns a list of active queries associated with this SparkSession
 
         .. versionadded:: 2.0.0
 
         Examples
         --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sdf.printSchema()
+        root
+          |-- timestamp: timestamp (nullable = true)
+          |-- value: long (nullable = true)
+
         >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sqm = spark.streams
-        >>> # get the list of active streaming queries
+
+        Get the list of active streaming queries
+
         >>> [q.name for q in sqm.active]
         ['this_query']
         >>> sq.stop()
@@ -234,29 +380,37 @@ class StreamingQueryManager:
         return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
 
     def get(self, id: str) -> StreamingQuery:
-        """Returns an active query from this SQLContext or throws exception if 
an active query
+        """
+        Returns an active query from this SparkSession or throws exception if 
an active query
         with this name doesn't exist.
 
         .. versionadded:: 2.0.0
 
         Examples
         --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sdf.printSchema()
+        root
+          |-- timestamp: timestamp (nullable = true)
+          |-- value: long (nullable = true)
+
         >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sq.name
         'this_query'
+
+        Get an active query by id
+
         >>> sq = spark.streams.get(sq.id)
         >>> sq.isActive
         True
-        >>> sq = sqlContext.streams.get(sq.id)
-        >>> sq.isActive
-        True
         >>> sq.stop()
         """
         return StreamingQuery(self._jsqm.get(id))
 
     @since(2.0)
     def awaitAnyTermination(self, timeout: Optional[int] = None) -> 
Optional[bool]:
-        """Wait until any of the queries on the associated SQLContext has 
terminated since the
+        """
+        Wait until any of the queries on the associated SparkSession has 
terminated since the
         creation of the context, or since :func:`resetTerminated()` was 
called. If any query was
         terminated with an exception, then the exception will be thrown.
         If `timeout` is set, it returns whether the query has terminated or 
not within the
@@ -274,6 +428,18 @@ class StreamingQueryManager:
         then check the `query.exception()` for each query.
 
         throws :class:`StreamingQueryException`, if `this` query has 
terminated with an exception
+
+        Examples
+        --------
+        >>> sdf = spark.readStream.format("rate").load()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+
+        Return wheter any of the query on the associated SparkSession
+        has terminated or not within 5 seconds
+
+        >>> spark.streams.awaitAnyTermination(5)
+        True
+        >>> sq.stop()
         """
         if timeout is not None:
             if not isinstance(timeout, (int, float)) or timeout < 0:
@@ -283,7 +449,8 @@ class StreamingQueryManager:
             return self._jsqm.awaitAnyTermination()
 
     def resetTerminated(self) -> None:
-        """Forget about past terminated queries so that 
:func:`awaitAnyTermination()` can be used
+        """
+        Forget about past terminated queries so that 
:func:`awaitAnyTermination()` can be used
         again to wait for new terminations.
 
         .. versionadded:: 2.0.0
@@ -300,6 +467,28 @@ class StreamingQueryManager:
         :class:`~pyspark.sql.streaming.StreamingQuery`.
 
         .. versionadded:: 3.4.0
+
+        Examples
+        --------
+        >>> from pyspark.sql.streaming import StreamingQueryListener
+        >>> class TestListener(StreamingQueryListener):
+        ...     def onQueryStarted(self, event):
+        ...         pass
+        ...
+        ...     def onQueryProgress(self, event):
+        ...         pass
+        ...
+        ...     def onQueryTerminated(self, event):
+        ...         pass
+        >>> test_listener = TestListener()
+
+        Register streaming query listener
+
+        >>> spark.streams.addListener(test_listener)
+
+        Deregister streaming query listener
+
+        >>> spark.streams.removeListener(test_listener)
         """
         from pyspark import SparkContext
         from pyspark.java_gateway import ensure_callback_server_started
@@ -316,6 +505,26 @@ class StreamingQueryManager:
         Deregister a :class:`StreamingQueryListener`.
 
         .. versionadded:: 3.4.0
+
+        >>> from pyspark.sql.streaming import StreamingQueryListener
+        >>> class TestListener(StreamingQueryListener):
+        ...     def onQueryStarted(self, event):
+        ...         pass
+        ...
+        ...     def onQueryProgress(self, event):
+        ...         pass
+        ...
+        ...     def onQueryTerminated(self, event):
+        ...         pass
+        >>> test_listener = TestListener()
+
+        Register streaming query listener
+
+        >>> spark.streams.addListener(test_listener)
+
+        Deregister streaming query listener
+
+        >>> spark.streams.removeListener(test_listener)
         """
         self._jsqm.removeListener(listener._jlistener)
 
@@ -323,7 +532,7 @@ class StreamingQueryManager:
 def _test() -> None:
     import doctest
     import os
-    from pyspark.sql import SparkSession, SQLContext
+    from pyspark.sql import SparkSession
     import pyspark.sql.streaming.query
     from py4j.protocol import Py4JError
 
@@ -336,8 +545,6 @@ def _test() -> None:
         spark = SparkSession(sc)  # type: ignore[name-defined] # noqa: F821
 
     globs["spark"] = spark
-    globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext)
-    globs["sdf"] = 
spark.readStream.format("text").load("python/test_support/sql/streaming")
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.streaming.query,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to