This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 43af6f9 [SPARK-38711][PYTHON][SS] Refactor pyspark.sql.streaming module 43af6f9 is described below commit 43af6f9c7fe3feec9dd1aa26a9988f217ad4a8ab Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Apr 1 08:29:30 2022 +0900 [SPARK-38711][PYTHON][SS] Refactor pyspark.sql.streaming module ### What changes were proposed in this pull request? This PR changes `pyspark/sql/streaming.py` module to a package with two files: `pyspark/sql/streaming/query.py` and `pyspark/sql/streaming/readwriter.py`. This is similar with `pyspark/sql/dataframe.py` and `pyspark/sql/readwriter.py`. There should be no user facing change because I kept the existing import at `pyspark/sql/streaming/__init__.py`. ### Why are the changes needed? To make the codes easier to read. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Existing test cases should cover this change. Closes #36023 from HyukjinKwon/refactoring-streaming-module. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .github/labeler.yml | 6 +- dev/sparktestsupport/modules.py | 3 +- python/pyspark/sql/streaming/__init__.py | 19 ++ python/pyspark/sql/streaming/query.py | 329 +++++++++++++++++++++ .../sql/{streaming.py => streaming/readwriter.py} | 289 +----------------- python/setup.py | 1 + 6 files changed, 362 insertions(+), 285 deletions(-) diff --git a/.github/labeler.yml b/.github/labeler.yml index bd61902..b3f33cf 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -84,11 +84,11 @@ SPARK SHELL: - "repl/**/*" - "bin/spark-shell*" SQL: -#- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming.py", "!python/pyspark/sql/tests/test_streaming.py"] +#- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming/**/*", "!python/pyspark/sql/tests/test_streaming.py"] - "**/sql/**/*" - "common/unsafe/**/*" #- "!python/pyspark/sql/avro/**/*" - #- "!python/pyspark/sql/streaming.py" + #- "!python/pyspark/sql/streaming/**/*" #- "!python/pyspark/sql/tests/test_streaming.py" - "bin/spark-sql*" - "bin/beeline*" @@ -124,7 +124,7 @@ MLLIB: STRUCTURED STREAMING: - "**/sql/**/streaming/**/*" - "external/kafka-0-10-sql/**/*" - - "python/pyspark/sql/streaming.py" + - "python/pyspark/sql/streaming/**/*" - "python/pyspark/sql/tests/test_streaming.py" - "**/*streaming.R" PYTHON: diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index aab6305..d8a3508 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -423,7 +423,8 @@ pyspark_sql = Module( "pyspark.sql.group", "pyspark.sql.functions", "pyspark.sql.readwriter", - "pyspark.sql.streaming", + "pyspark.sql.streaming.query", + "pyspark.sql.streaming.readwriter", "pyspark.sql.udf", "pyspark.sql.window", "pyspark.sql.avro.functions", diff --git a/python/pyspark/sql/streaming/__init__.py b/python/pyspark/sql/streaming/__init__.py new file mode 100644 index 0000000..1c610e3 --- /dev/null +++ b/python/pyspark/sql/streaming/__init__.py @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.sql.streaming.query import StreamingQuery, StreamingQueryManager # noqa: F401 +from pyspark.sql.streaming.readwriter import DataStreamReader, DataStreamWriter # noqa: F401 diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py new file mode 100644 index 0000000..8565435 --- /dev/null +++ b/python/pyspark/sql/streaming/query.py @@ -0,0 +1,329 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import sys +from typing import Any, Dict, List, Optional + +from py4j.java_gateway import JavaObject + +from pyspark import since +from pyspark.sql.utils import StreamingQueryException + + +__all__ = ["StreamingQuery", "StreamingQueryManager"] + + +class StreamingQuery: + """ + A handle to a query that is executing continuously in the background as new data arrives. + All these methods are thread-safe. + + .. versionadded:: 2.0.0 + + Notes + ----- + This API is evolving. + """ + + def __init__(self, jsq: JavaObject) -> None: + self._jsq = jsq + + @property # type: ignore[misc] + @since(2.0) + def id(self) -> str: + """Returns the unique id of this query that persists across restarts from checkpoint data. + That is, this id is generated when a query is started for the first time, and + will be the same every time it is restarted from checkpoint data. + There can only be one query with the same id active in a Spark cluster. + Also see, `runId`. + """ + return self._jsq.id().toString() + + @property # type: ignore[misc] + @since(2.1) + def runId(self) -> str: + """Returns the unique id of this query that does not persist across restarts. That is, every + query that is started (or restarted from checkpoint) will have a different runId. + """ + return self._jsq.runId().toString() + + @property # type: ignore[misc] + @since(2.0) + def name(self) -> str: + """Returns the user-specified name of the query, or null if not specified. + This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` + as `dataframe.writeStream.queryName("query").start()`. + This name, if set, must be unique across all active queries. + """ + return self._jsq.name() + + @property # type: ignore[misc] + @since(2.0) + def isActive(self) -> bool: + """Whether this streaming query is currently active or not.""" + return self._jsq.isActive() + + @since(2.0) + def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]: + """Waits for the termination of `this` query, either by :func:`query.stop()` or by an + exception. If the query has terminated with an exception, then the exception will be thrown. + If `timeout` is set, it returns whether the query has terminated or not within the + `timeout` seconds. + + If the query has terminated, then all subsequent calls to this method will either return + immediately (if the query was terminated by :func:`stop()`), or throw the exception + immediately (if the query has terminated with exception). + + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception + """ + if timeout is not None: + if not isinstance(timeout, (int, float)) or timeout < 0: + raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) + return self._jsq.awaitTermination(int(timeout * 1000)) + else: + return self._jsq.awaitTermination() + + @property # type: ignore[misc] + @since(2.1) + def status(self) -> Dict[str, Any]: + """ + Returns the current status of the query. + """ + return json.loads(self._jsq.status().json()) + + @property # type: ignore[misc] + @since(2.1) + def recentProgress(self) -> List[Dict[str, Any]]: + """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. + The number of progress updates retained for each stream is configured by Spark session + configuration `spark.sql.streaming.numRecentProgressUpdates`. + """ + return [json.loads(p.json()) for p in self._jsq.recentProgress()] + + @property + def lastProgress(self) -> Optional[Dict[str, Any]]: + """ + Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or + None if there were no progress updates + + .. versionadded:: 2.1.0 + + Returns + ------- + dict + """ + lastProgress = self._jsq.lastProgress() + if lastProgress: + return json.loads(lastProgress.json()) + else: + return None + + def processAllAvailable(self) -> None: + """Blocks until all available data in the source has been processed and committed to the + sink. This method is intended for testing. + + .. versionadded:: 2.0.0 + + Notes + ----- + In the case of continually arriving data, this method may block forever. + Additionally, this method is only guaranteed to block until data that has been + synchronously appended data to a stream source prior to invocation. + (i.e. `getOffset` must immediately reflect the addition). + """ + return self._jsq.processAllAvailable() + + @since(2.0) + def stop(self) -> None: + """Stop this streaming query.""" + self._jsq.stop() + + def explain(self, extended: bool = False) -> None: + """Prints the (logical and physical) plans to the console for debugging purpose. + + .. versionadded:: 2.1.0 + + Parameters + ---------- + extended : bool, optional + default ``False``. If ``False``, prints only the physical plan. + + Examples + -------- + >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() + >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. + >>> sq.explain() + == Physical Plan == + ... + >>> sq.explain(True) + == Parsed Logical Plan == + ... + == Analyzed Logical Plan == + ... + == Optimized Logical Plan == + ... + == Physical Plan == + ... + >>> sq.stop() + """ + # Cannot call `_jsq.explain(...)` because it will print in the JVM process. + # We should print it in the Python process. + print(self._jsq.explainInternal(extended)) + + def exception(self) -> Optional[StreamingQueryException]: + """ + .. versionadded:: 2.1.0 + + Returns + ------- + :class:`StreamingQueryException` + the StreamingQueryException if the query was terminated by an exception, or None. + """ + if self._jsq.exception().isDefined(): + je = self._jsq.exception().get() + msg = je.toString().split(": ", 1)[1] # Drop the Java StreamingQueryException type info + stackTrace = "\n\t at ".join(map(lambda x: x.toString(), je.getStackTrace())) + return StreamingQueryException(msg, stackTrace, je.getCause()) + else: + return None + + +class StreamingQueryManager: + """A class to manage all the :class:`StreamingQuery` StreamingQueries active. + + .. versionadded:: 2.0.0 + + Notes + ----- + This API is evolving. + """ + + def __init__(self, jsqm: JavaObject) -> None: + self._jsqm = jsqm + + @property + def active(self) -> List[StreamingQuery]: + """Returns a list of active queries associated with this SQLContext + + .. versionadded:: 2.0.0 + + Examples + -------- + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sqm = spark.streams + >>> # get the list of active streaming queries + >>> [q.name for q in sqm.active] + ['this_query'] + >>> sq.stop() + """ + return [StreamingQuery(jsq) for jsq in self._jsqm.active()] + + def get(self, id: str) -> StreamingQuery: + """Returns an active query from this SQLContext or throws exception if an active query + with this name doesn't exist. + + .. versionadded:: 2.0.0 + + Examples + -------- + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.name + 'this_query' + >>> sq = spark.streams.get(sq.id) + >>> sq.isActive + True + >>> sq = sqlContext.streams.get(sq.id) + >>> sq.isActive + True + >>> sq.stop() + """ + return StreamingQuery(self._jsqm.get(id)) + + @since(2.0) + def awaitAnyTermination(self, timeout: Optional[int] = None) -> Optional[bool]: + """Wait until any of the queries on the associated SQLContext has terminated since the + creation of the context, or since :func:`resetTerminated()` was called. If any query was + terminated with an exception, then the exception will be thrown. + If `timeout` is set, it returns whether the query has terminated or not within the + `timeout` seconds. + + If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will + either return immediately (if the query was terminated by :func:`query.stop()`), + or throw the exception immediately (if the query was terminated with exception). Use + :func:`resetTerminated()` to clear past terminations and wait for new terminations. + + In the case where multiple queries have terminated since :func:`resetTermination()` + was called, if any query has terminated with exception, then :func:`awaitAnyTermination()` + will throw any of the exception. For correctly documenting exceptions across multiple + queries, users need to stop all of them after any of them terminates with exception, and + then check the `query.exception()` for each query. + + throws :class:`StreamingQueryException`, if `this` query has terminated with an exception + """ + if timeout is not None: + if not isinstance(timeout, (int, float)) or timeout < 0: + raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) + return self._jsqm.awaitAnyTermination(int(timeout * 1000)) + else: + return self._jsqm.awaitAnyTermination() + + def resetTerminated(self) -> None: + """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used + again to wait for new terminations. + + .. versionadded:: 2.0.0 + + Examples + -------- + >>> spark.streams.resetTerminated() + """ + self._jsqm.resetTerminated() + + +def _test() -> None: + import doctest + import os + from pyspark.sql import SparkSession, SQLContext + import pyspark.sql.streaming.query + from py4j.protocol import Py4JError + + os.chdir(os.environ["SPARK_HOME"]) + + globs = pyspark.sql.streaming.query.__dict__.copy() + try: + spark = SparkSession._getActiveSessionOrCreate() + except Py4JError: # noqa: F821 + spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821 + + globs["spark"] = spark + globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext) + globs["sdf"] = spark.readStream.format("text").load("python/test_support/sql/streaming") + + (failure_count, test_count) = doctest.testmod( + pyspark.sql.streaming.query, + globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF, + ) + globs["spark"].stop() + + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming/readwriter.py similarity index 80% rename from python/pyspark/sql/streaming.py rename to python/pyspark/sql/streaming/readwriter.py index 7517a41..e87b600 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -16,291 +16,23 @@ # import sys -import json from collections.abc import Iterator -from typing import cast, overload, Any, Callable, Dict, List, Optional, TYPE_CHECKING, Union +from typing import cast, overload, Any, Callable, List, Optional, TYPE_CHECKING, Union from py4j.java_gateway import java_import, JavaObject -from pyspark import since from pyspark.sql.column import _to_seq from pyspark.sql.readwriter import OptionUtils, to_str +from pyspark.sql.streaming.query import StreamingQuery from pyspark.sql.types import Row, StructType, StructField, StringType -from pyspark.sql.utils import ForeachBatchFunction, StreamingQueryException +from pyspark.sql.utils import ForeachBatchFunction if TYPE_CHECKING: from pyspark.sql.session import SparkSession from pyspark.sql._typing import SupportsProcess, OptionalPrimitiveType from pyspark.sql.dataframe import DataFrame -__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"] - - -class StreamingQuery: - """ - A handle to a query that is executing continuously in the background as new data arrives. - All these methods are thread-safe. - - .. versionadded:: 2.0.0 - - Notes - ----- - This API is evolving. - """ - - def __init__(self, jsq: JavaObject) -> None: - self._jsq = jsq - - @property # type: ignore[misc] - @since(2.0) - def id(self) -> str: - """Returns the unique id of this query that persists across restarts from checkpoint data. - That is, this id is generated when a query is started for the first time, and - will be the same every time it is restarted from checkpoint data. - There can only be one query with the same id active in a Spark cluster. - Also see, `runId`. - """ - return self._jsq.id().toString() - - @property # type: ignore[misc] - @since(2.1) - def runId(self) -> str: - """Returns the unique id of this query that does not persist across restarts. That is, every - query that is started (or restarted from checkpoint) will have a different runId. - """ - return self._jsq.runId().toString() - - @property # type: ignore[misc] - @since(2.0) - def name(self) -> str: - """Returns the user-specified name of the query, or null if not specified. - This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` - as `dataframe.writeStream.queryName("query").start()`. - This name, if set, must be unique across all active queries. - """ - return self._jsq.name() - - @property # type: ignore[misc] - @since(2.0) - def isActive(self) -> bool: - """Whether this streaming query is currently active or not.""" - return self._jsq.isActive() - - @since(2.0) - def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]: - """Waits for the termination of `this` query, either by :func:`query.stop()` or by an - exception. If the query has terminated with an exception, then the exception will be thrown. - If `timeout` is set, it returns whether the query has terminated or not within the - `timeout` seconds. - - If the query has terminated, then all subsequent calls to this method will either return - immediately (if the query was terminated by :func:`stop()`), or throw the exception - immediately (if the query has terminated with exception). - - throws :class:`StreamingQueryException`, if `this` query has terminated with an exception - """ - if timeout is not None: - if not isinstance(timeout, (int, float)) or timeout < 0: - raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jsq.awaitTermination(int(timeout * 1000)) - else: - return self._jsq.awaitTermination() - - @property # type: ignore[misc] - @since(2.1) - def status(self) -> Dict[str, Any]: - """ - Returns the current status of the query. - """ - return json.loads(self._jsq.status().json()) - - @property # type: ignore[misc] - @since(2.1) - def recentProgress(self) -> List[Dict[str, Any]]: - """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. - The number of progress updates retained for each stream is configured by Spark session - configuration `spark.sql.streaming.numRecentProgressUpdates`. - """ - return [json.loads(p.json()) for p in self._jsq.recentProgress()] - - @property - def lastProgress(self) -> Optional[Dict[str, Any]]: - """ - Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or - None if there were no progress updates - - .. versionadded:: 2.1.0 - - Returns - ------- - dict - """ - lastProgress = self._jsq.lastProgress() - if lastProgress: - return json.loads(lastProgress.json()) - else: - return None - - def processAllAvailable(self) -> None: - """Blocks until all available data in the source has been processed and committed to the - sink. This method is intended for testing. - - .. versionadded:: 2.0.0 - - Notes - ----- - In the case of continually arriving data, this method may block forever. - Additionally, this method is only guaranteed to block until data that has been - synchronously appended data to a stream source prior to invocation. - (i.e. `getOffset` must immediately reflect the addition). - """ - return self._jsq.processAllAvailable() - - @since(2.0) - def stop(self) -> None: - """Stop this streaming query.""" - self._jsq.stop() - - def explain(self, extended: bool = False) -> None: - """Prints the (logical and physical) plans to the console for debugging purpose. - - .. versionadded:: 2.1.0 - - Parameters - ---------- - extended : bool, optional - default ``False``. If ``False``, prints only the physical plan. - - Examples - -------- - >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() - >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. - >>> sq.explain() - == Physical Plan == - ... - >>> sq.explain(True) - == Parsed Logical Plan == - ... - == Analyzed Logical Plan == - ... - == Optimized Logical Plan == - ... - == Physical Plan == - ... - >>> sq.stop() - """ - # Cannot call `_jsq.explain(...)` because it will print in the JVM process. - # We should print it in the Python process. - print(self._jsq.explainInternal(extended)) - - def exception(self) -> Optional[StreamingQueryException]: - """ - .. versionadded:: 2.1.0 - - Returns - ------- - :class:`StreamingQueryException` - the StreamingQueryException if the query was terminated by an exception, or None. - """ - if self._jsq.exception().isDefined(): - je = self._jsq.exception().get() - msg = je.toString().split(": ", 1)[1] # Drop the Java StreamingQueryException type info - stackTrace = "\n\t at ".join(map(lambda x: x.toString(), je.getStackTrace())) - return StreamingQueryException(msg, stackTrace, je.getCause()) - else: - return None - - -class StreamingQueryManager: - """A class to manage all the :class:`StreamingQuery` StreamingQueries active. - - .. versionadded:: 2.0.0 - - Notes - ----- - This API is evolving. - """ - - def __init__(self, jsqm: JavaObject) -> None: - self._jsqm = jsqm - - @property - def active(self) -> List[StreamingQuery]: - """Returns a list of active queries associated with this SQLContext - - .. versionadded:: 2.0.0 - - Examples - -------- - >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> sqm = spark.streams - >>> # get the list of active streaming queries - >>> [q.name for q in sqm.active] - ['this_query'] - >>> sq.stop() - """ - return [StreamingQuery(jsq) for jsq in self._jsqm.active()] - - def get(self, id: str) -> StreamingQuery: - """Returns an active query from this SQLContext or throws exception if an active query - with this name doesn't exist. - - .. versionadded:: 2.0.0 - - Examples - -------- - >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> sq.name - 'this_query' - >>> sq = spark.streams.get(sq.id) - >>> sq.isActive - True - >>> sq = sqlContext.streams.get(sq.id) - >>> sq.isActive - True - >>> sq.stop() - """ - return StreamingQuery(self._jsqm.get(id)) - - @since(2.0) - def awaitAnyTermination(self, timeout: Optional[int] = None) -> Optional[bool]: - """Wait until any of the queries on the associated SQLContext has terminated since the - creation of the context, or since :func:`resetTerminated()` was called. If any query was - terminated with an exception, then the exception will be thrown. - If `timeout` is set, it returns whether the query has terminated or not within the - `timeout` seconds. - - If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will - either return immediately (if the query was terminated by :func:`query.stop()`), - or throw the exception immediately (if the query was terminated with exception). Use - :func:`resetTerminated()` to clear past terminations and wait for new terminations. - - In the case where multiple queries have terminated since :func:`resetTermination()` - was called, if any query has terminated with exception, then :func:`awaitAnyTermination()` - will throw any of the exception. For correctly documenting exceptions across multiple - queries, users need to stop all of them after any of them terminates with exception, and - then check the `query.exception()` for each query. - - throws :class:`StreamingQueryException`, if `this` query has terminated with an exception - """ - if timeout is not None: - if not isinstance(timeout, (int, float)) or timeout < 0: - raise ValueError("timeout must be a positive integer or float. Got %s" % timeout) - return self._jsqm.awaitAnyTermination(int(timeout * 1000)) - else: - return self._jsqm.awaitAnyTermination() - - def resetTerminated(self) -> None: - """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used - again to wait for new terminations. - - .. versionadded:: 2.0.0 - - Examples - -------- - >>> spark.streams.resetTerminated() - """ - self._jsqm.resetTerminated() +__all__ = ["DataStreamReader", "DataStreamWriter"] class DataStreamReader(OptionUtils): @@ -860,8 +592,6 @@ class DataStreamWriter: self._jwrite = df._jdf.writeStream() def _sq(self, jsq: JavaObject) -> StreamingQuery: - from pyspark.sql.streaming import StreamingQuery - return StreamingQuery(jsq) def outputMode(self, outputMode: str) -> "DataStreamWriter": @@ -1472,28 +1202,25 @@ def _test() -> None: import doctest import os import tempfile - from pyspark.sql import SparkSession, SQLContext - import pyspark.sql.streaming + from pyspark.sql import SparkSession + import pyspark.sql.streaming.readwriter from py4j.protocol import Py4JError os.chdir(os.environ["SPARK_HOME"]) - globs = pyspark.sql.streaming.__dict__.copy() + globs = pyspark.sql.streaming.readwriter.__dict__.copy() try: spark = SparkSession._getActiveSessionOrCreate() except Py4JError: # noqa: F821 spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821 globs["tempfile"] = tempfile - globs["os"] = os globs["spark"] = spark - globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext) globs["sdf"] = spark.readStream.format("text").load("python/test_support/sql/streaming") globs["sdf_schema"] = StructType([StructField("data", StringType(), True)]) - globs["df"] = globs["spark"].readStream.format("text").load("python/test_support/sql/streaming") (failure_count, test_count) = doctest.testmod( - pyspark.sql.streaming, + pyspark.sql.streaming.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF, ) diff --git a/python/setup.py b/python/setup.py index a8d16ff..6128b20 100755 --- a/python/setup.py +++ b/python/setup.py @@ -216,6 +216,7 @@ try: 'pyspark.sql', 'pyspark.sql.avro', 'pyspark.sql.pandas', + 'pyspark.sql.streaming', 'pyspark.streaming', 'pyspark.bin', 'pyspark.sbin', --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org