Repository: spark
Updated Branches:
  refs/heads/master 153c2f9ac -> f454a7f9f


[SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql to 
pyspark.sql.streaming

## What changes were proposed in this pull request?

- Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming to 
make them consistent with scala packaging
- Exposed the necessary classes in sql.streaming package so that they appear in 
the docs
- Added pyspark.sql.streaming module to the docs

## How was this patch tested?
- updated unit tests.
- generated docs for testing visibility of pyspark.sql.streaming classes.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13955 from tdas/SPARK-16266.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f454a7f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f454a7f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f454a7f9

Branch: refs/heads/master
Commit: f454a7f9f03807dd768319798daa1351bbfc7288
Parents: 153c2f9
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Jun 28 22:07:11 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Jun 28 22:07:11 2016 -0700

----------------------------------------------------------------------
 python/docs/pyspark.sql.rst      |   6 +
 python/pyspark/sql/context.py    |   3 +-
 python/pyspark/sql/dataframe.py  |   3 +-
 python/pyspark/sql/readwriter.py | 493 +--------------------------------
 python/pyspark/sql/session.py    |   3 +-
 python/pyspark/sql/streaming.py  | 502 +++++++++++++++++++++++++++++++++-
 6 files changed, 511 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/docs/pyspark.sql.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index 6259379..3be9533 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -21,3 +21,9 @@ pyspark.sql.functions module
 .. automodule:: pyspark.sql.functions
     :members:
     :undoc-members:
+
+pyspark.sql.streaming module
+----------------------------
+.. automodule:: pyspark.sql.streaming
+    :members:
+    :undoc-members:

http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index b5dde13..3503fb9 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -26,7 +26,8 @@ from pyspark import since
 from pyspark.rdd import ignore_unicode_prefix
 from pyspark.sql.session import _monkey_patch_RDD, SparkSession
 from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
+from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.streaming import DataStreamReader
 from pyspark.sql.types import Row, StringType
 from pyspark.sql.utils import install_exception_handler
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 4f13307..e44b01b 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -33,7 +33,8 @@ from pyspark.storagelevel import StorageLevel
 from pyspark.traceback_utils import SCCallSiteSync
 from pyspark.sql.types import _parse_datatype_json_string
 from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column
-from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter
+from pyspark.sql.readwriter import DataFrameWriter
+from pyspark.sql.streaming import DataStreamWriter
 from pyspark.sql.types import *
 
 __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"]

http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3f28d7a..10f307b 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq
 from pyspark.sql.types import *
 from pyspark.sql import utils
 
-__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", 
"DataStreamWriter"]
+__all__ = ["DataFrameReader", "DataFrameWriter"]
 
 
 def to_str(value):
@@ -724,494 +724,6 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
-class DataStreamReader(OptionUtils):
-    """
-    Interface used to load a streaming :class:`DataFrame` from external 
storage systems
-    (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
-    to access this.
-
-    .. note:: Experimental.
-
-    .. versionadded:: 2.0
-    """
-
-    def __init__(self, spark):
-        self._jreader = spark._ssql_ctx.readStream()
-        self._spark = spark
-
-    def _df(self, jdf):
-        from pyspark.sql.dataframe import DataFrame
-        return DataFrame(jdf, self._spark)
-
-    @since(2.0)
-    def format(self, source):
-        """Specifies the input data source format.
-
-        .. note:: Experimental.
-
-        :param source: string, name of the data source, e.g. 'json', 'parquet'.
-
-        >>> s = spark.readStream.format("text")
-        """
-        self._jreader = self._jreader.format(source)
-        return self
-
-    @since(2.0)
-    def schema(self, schema):
-        """Specifies the input schema.
-
-        Some data sources (e.g. JSON) can infer the input schema automatically 
from data.
-        By specifying the schema here, the underlying data source can skip the 
schema
-        inference step, and thus speed up data loading.
-
-        .. note:: Experimental.
-
-        :param schema: a StructType object
-
-        >>> s = spark.readStream.schema(sdf_schema)
-        """
-        if not isinstance(schema, StructType):
-            raise TypeError("schema should be StructType")
-        jschema = self._spark._ssql_ctx.parseDataType(schema.json())
-        self._jreader = self._jreader.schema(jschema)
-        return self
-
-    @since(2.0)
-    def option(self, key, value):
-        """Adds an input option for the underlying data source.
-
-        .. note:: Experimental.
-
-        >>> s = spark.readStream.option("x", 1)
-        """
-        self._jreader = self._jreader.option(key, to_str(value))
-        return self
-
-    @since(2.0)
-    def options(self, **options):
-        """Adds input options for the underlying data source.
-
-        .. note:: Experimental.
-
-        >>> s = spark.readStream.options(x="1", y=2)
-        """
-        for k in options:
-            self._jreader = self._jreader.option(k, to_str(options[k]))
-        return self
-
-    @since(2.0)
-    def load(self, path=None, format=None, schema=None, **options):
-        """Loads a data stream from a data source and returns it as a 
:class`DataFrame`.
-
-        .. note:: Experimental.
-
-        :param path: optional string for file-system backed data sources.
-        :param format: optional string for format of the data source. Default 
to 'parquet'.
-        :param schema: optional :class:`StructType` for the input schema.
-        :param options: all other string options
-
-        >>> json_sdf = spark.readStream.format("json")\
-                                       .schema(sdf_schema)\
-                                       
.load(os.path.join(tempfile.mkdtemp(),'data'))
-        >>> json_sdf.isStreaming
-        True
-        >>> json_sdf.schema == sdf_schema
-        True
-        """
-        if format is not None:
-            self.format(format)
-        if schema is not None:
-            self.schema(schema)
-        self.options(**options)
-        if path is not None:
-            if type(path) != str or len(path.strip()) == 0:
-                raise ValueError("If the path is provided for stream, it needs 
to be a " +
-                                 "non-empty string. List of paths are not 
supported.")
-            return self._df(self._jreader.load(path))
-        else:
-            return self._df(self._jreader.load())
-
-    @since(2.0)
-    def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
-             allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
-             allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
-             mode=None, columnNameOfCorruptRecord=None):
-        """
-        Loads a JSON file stream (one object per line) and returns a 
:class`DataFrame`.
-
-        If the ``schema`` parameter is not specified, this function goes
-        through the input once to determine the input schema.
-
-        .. note:: Experimental.
-
-        :param path: string represents path to the JSON dataset,
-                     or RDD of Strings storing JSON objects.
-        :param schema: an optional :class:`StructType` for the input schema.
-        :param primitivesAsString: infers all primitive values as a string 
type. If None is set,
-                                   it uses the default value, ``false``.
-        :param prefersDecimal: infers all floating-point values as a decimal 
type. If the values
-                               do not fit in decimal, then it infers them as 
doubles. If None is
-                               set, it uses the default value, ``false``.
-        :param allowComments: ignores Java/C++ style comment in JSON records. 
If None is set,
-                              it uses the default value, ``false``.
-        :param allowUnquotedFieldNames: allows unquoted JSON field names. If 
None is set,
-                                        it uses the default value, ``false``.
-        :param allowSingleQuotes: allows single quotes in addition to double 
quotes. If None is
-                                        set, it uses the default value, 
``true``.
-        :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 
00012). If None is
-                                        set, it uses the default value, 
``false``.
-        :param allowBackslashEscapingAnyCharacter: allows accepting quoting of 
all character
-                                                   using backslash quoting 
mechanism. If None is
-                                                   set, it uses the default 
value, ``false``.
-        :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
-                     set, it uses the default value, ``PERMISSIVE``.
-
-                *  ``PERMISSIVE`` : sets other fields to ``null`` when it 
meets a corrupted \
-                  record and puts the malformed string into a new field 
configured by \
-                 ``columnNameOfCorruptRecord``. When a schema is set by user, 
it sets \
-                 ``null`` for extra fields.
-                *  ``DROPMALFORMED`` : ignores the whole corrupted records.
-                *  ``FAILFAST`` : throws an exception when it meets corrupted 
records.
-
-        :param columnNameOfCorruptRecord: allows renaming the new field having 
malformed string
-                                          created by ``PERMISSIVE`` mode. This 
overrides
-                                          
``spark.sql.columnNameOfCorruptRecord``. If None is set,
-                                          it uses the value specified in
-                                          
``spark.sql.columnNameOfCorruptRecord``.
-
-        >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 
'data'), \
-                schema = sdf_schema)
-        >>> json_sdf.isStreaming
-        True
-        >>> json_sdf.schema == sdf_schema
-        True
-        """
-        self._set_opts(
-            schema=schema, primitivesAsString=primitivesAsString, 
prefersDecimal=prefersDecimal,
-            allowComments=allowComments, 
allowUnquotedFieldNames=allowUnquotedFieldNames,
-            allowSingleQuotes=allowSingleQuotes, 
allowNumericLeadingZero=allowNumericLeadingZero,
-            
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
-            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
-        if isinstance(path, basestring):
-            return self._df(self._jreader.json(path))
-        else:
-            raise TypeError("path can be only a single string")
-
-    @since(2.0)
-    def parquet(self, path):
-        """Loads a Parquet file stream, returning the result as a 
:class:`DataFrame`.
-
-        You can set the following Parquet-specific option(s) for reading 
Parquet files:
-            * ``mergeSchema``: sets whether we should merge schemas collected 
from all \
-                Parquet part-files. This will override 
``spark.sql.parquet.mergeSchema``. \
-                The default value is specified in 
``spark.sql.parquet.mergeSchema``.
-
-        .. note:: Experimental.
-
-        >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
-                .parquet(os.path.join(tempfile.mkdtemp()))
-        >>> parquet_sdf.isStreaming
-        True
-        >>> parquet_sdf.schema == sdf_schema
-        True
-        """
-        if isinstance(path, basestring):
-            return self._df(self._jreader.parquet(path))
-        else:
-            raise TypeError("path can be only a single string")
-
-    @ignore_unicode_prefix
-    @since(2.0)
-    def text(self, path):
-        """
-        Loads a text file stream and returns a :class:`DataFrame` whose schema 
starts with a
-        string column named "value", and followed by partitioned columns if 
there
-        are any.
-
-        Each line in the text file is a new row in the resulting DataFrame.
-
-        .. note:: Experimental.
-
-        :param paths: string, or list of strings, for input path(s).
-
-        >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 
'data'))
-        >>> text_sdf.isStreaming
-        True
-        >>> "value" in str(text_sdf.schema)
-        True
-        """
-        if isinstance(path, basestring):
-            return self._df(self._jreader.text(path))
-        else:
-            raise TypeError("path can be only a single string")
-
-    @since(2.0)
-    def csv(self, path, schema=None, sep=None, encoding=None, quote=None, 
escape=None,
-            comment=None, header=None, inferSchema=None, 
ignoreLeadingWhiteSpace=None,
-            ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
-            negativeInf=None, dateFormat=None, maxColumns=None, 
maxCharsPerColumn=None,
-            maxMalformedLogPerPartition=None, mode=None):
-        """Loads a CSV file stream and returns the result as a  
:class:`DataFrame`.
-
-        This function will go through the input once to determine the input 
schema if
-        ``inferSchema`` is enabled. To avoid going through the entire data 
once, disable
-        ``inferSchema`` option or specify the schema explicitly using 
``schema``.
-
-        .. note:: Experimental.
-
-        :param path: string, or list of strings, for input path(s).
-        :param schema: an optional :class:`StructType` for the input schema.
-        :param sep: sets the single character as a separator for each field 
and value.
-                    If None is set, it uses the default value, ``,``.
-        :param encoding: decodes the CSV files by the given encoding type. If 
None is set,
-                         it uses the default value, ``UTF-8``.
-        :param quote: sets the single character used for escaping quoted 
values where the
-                      separator can be part of the value. If None is set, it 
uses the default
-                      value, ``"``. If you would like to turn off quotations, 
you need to set an
-                      empty string.
-        :param escape: sets the single character used for escaping quotes 
inside an already
-                       quoted value. If None is set, it uses the default 
value, ``\``.
-        :param comment: sets the single character used for skipping lines 
beginning with this
-                        character. By default (None), it is disabled.
-        :param header: uses the first line as names of columns. If None is 
set, it uses the
-                       default value, ``false``.
-        :param inferSchema: infers the input schema automatically from data. 
It requires one extra
-                       pass over the data. If None is set, it uses the default 
value, ``false``.
-        :param ignoreLeadingWhiteSpace: defines whether or not leading 
whitespaces from values
-                                        being read should be skipped. If None 
is set, it uses
-                                        the default value, ``false``.
-        :param ignoreTrailingWhiteSpace: defines whether or not trailing 
whitespaces from values
-                                         being read should be skipped. If None 
is set, it uses
-                                         the default value, ``false``.
-        :param nullValue: sets the string representation of a null value. If 
None is set, it uses
-                          the default value, empty string.
-        :param nanValue: sets the string representation of a non-number value. 
If None is set, it
-                         uses the default value, ``NaN``.
-        :param positiveInf: sets the string representation of a positive 
infinity value. If None
-                            is set, it uses the default value, ``Inf``.
-        :param negativeInf: sets the string representation of a negative 
infinity value. If None
-                            is set, it uses the default value, ``Inf``.
-        :param dateFormat: sets the string that indicates a date format. 
Custom date formats
-                           follow the formats at 
``java.text.SimpleDateFormat``. This
-                           applies to both date type and timestamp type. By 
default, it is None
-                           which means trying to parse times and date by
-                           ``java.sql.Timestamp.valueOf()`` and 
``java.sql.Date.valueOf()``.
-        :param maxColumns: defines a hard limit of how many columns a record 
can have. If None is
-                           set, it uses the default value, ``20480``.
-        :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
-                                  value being read. If None is set, it uses 
the default value,
-                                  ``1000000``.
-        :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
-                     set, it uses the default value, ``PERMISSIVE``.
-
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted record.
-                    When a schema is set by user, it sets ``null`` for extra 
fields.
-                * ``DROPMALFORMED`` : ignores the whole corrupted records.
-                * ``FAILFAST`` : throws an exception when it meets corrupted 
records.
-
-        >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 
'data'), \
-                schema = sdf_schema)
-        >>> csv_sdf.isStreaming
-        True
-        >>> csv_sdf.schema == sdf_schema
-        True
-        """
-        self._set_opts(
-            schema=schema, sep=sep, encoding=encoding, quote=quote, 
escape=escape, comment=comment,
-            header=header, inferSchema=inferSchema, 
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
-            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, 
nullValue=nullValue,
-            nanValue=nanValue, positiveInf=positiveInf, 
negativeInf=negativeInf,
-            dateFormat=dateFormat, maxColumns=maxColumns, 
maxCharsPerColumn=maxCharsPerColumn,
-            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
-        if isinstance(path, basestring):
-            return self._df(self._jreader.csv(path))
-        else:
-            raise TypeError("path can be only a single string")
-
-
-class DataStreamWriter(object):
-    """
-    Interface used to write a streaming :class:`DataFrame` to external storage 
systems
-    (e.g. file systems, key-value stores, etc). Use 
:func:`DataFrame.writeStream`
-    to access this.
-
-    .. note:: Experimental.
-
-    .. versionadded:: 2.0
-    """
-
-    def __init__(self, df):
-        self._df = df
-        self._spark = df.sql_ctx
-        self._jwrite = df._jdf.writeStream()
-
-    def _sq(self, jsq):
-        from pyspark.sql.streaming import StreamingQuery
-        return StreamingQuery(jsq)
-
-    @since(2.0)
-    def outputMode(self, outputMode):
-        """Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
-
-        Options include:
-
-        * `append`:Only the new rows in the streaming DataFrame/Dataset will 
be written to
-           the sink
-        * `complete`:All the rows in the streaming DataFrame/Dataset will be 
written to the sink
-           every time these is some updates
-
-       .. note:: Experimental.
-
-        >>> writer = sdf.writeStream.outputMode('append')
-        """
-        if not outputMode or type(outputMode) != str or 
len(outputMode.strip()) == 0:
-            raise ValueError('The output mode must be a non-empty string. Got: 
%s' % outputMode)
-        self._jwrite = self._jwrite.outputMode(outputMode)
-        return self
-
-    @since(2.0)
-    def format(self, source):
-        """Specifies the underlying output data source.
-
-        .. note:: Experimental.
-
-        :param source: string, name of the data source, e.g. 'json', 'parquet'.
-
-        >>> writer = sdf.writeStream.format('json')
-        """
-        self._jwrite = self._jwrite.format(source)
-        return self
-
-    @since(2.0)
-    def option(self, key, value):
-        """Adds an output option for the underlying data source.
-
-        .. note:: Experimental.
-        """
-        self._jwrite = self._jwrite.option(key, to_str(value))
-        return self
-
-    @since(2.0)
-    def options(self, **options):
-        """Adds output options for the underlying data source.
-
-       .. note:: Experimental.
-        """
-        for k in options:
-            self._jwrite = self._jwrite.option(k, to_str(options[k]))
-        return self
-
-    @since(2.0)
-    def partitionBy(self, *cols):
-        """Partitions the output by the given columns on the file system.
-
-        If specified, the output is laid out on the file system similar
-        to Hive's partitioning scheme.
-
-        .. note:: Experimental.
-
-        :param cols: name of columns
-
-        """
-        if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
-            cols = cols[0]
-        self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
-        return self
-
-    @since(2.0)
-    def queryName(self, queryName):
-        """Specifies the name of the :class:`StreamingQuery` that can be 
started with
-        :func:`start`. This name must be unique among all the currently active 
queries
-        in the associated SparkSession.
-
-        .. note:: Experimental.
-
-        :param queryName: unique name for the query
-
-        >>> writer = sdf.writeStream.queryName('streaming_query')
-        """
-        if not queryName or type(queryName) != str or len(queryName.strip()) 
== 0:
-            raise ValueError('The queryName must be a non-empty string. Got: 
%s' % queryName)
-        self._jwrite = self._jwrite.queryName(queryName)
-        return self
-
-    @keyword_only
-    @since(2.0)
-    def trigger(self, processingTime=None):
-        """Set the trigger for the stream query. If this is not set it will 
run the query as fast
-        as possible, which is equivalent to setting the trigger to 
``processingTime='0 seconds'``.
-
-        .. note:: Experimental.
-
-        :param processingTime: a processing time interval as a string, e.g. '5 
seconds', '1 minute'.
-
-        >>> # trigger the query for execution every 5 seconds
-        >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
-        """
-        from pyspark.sql.streaming import ProcessingTime
-        trigger = None
-        if processingTime is not None:
-            if type(processingTime) != str or len(processingTime.strip()) == 0:
-                raise ValueError('The processing time must be a non empty 
string. Got: %s' %
-                                 processingTime)
-            trigger = ProcessingTime(processingTime)
-        if trigger is None:
-            raise ValueError('A trigger was not provided. Supported triggers: 
processingTime.')
-        self._jwrite = 
self._jwrite.trigger(trigger._to_java_trigger(self._spark))
-        return self
-
-    @ignore_unicode_prefix
-    @since(2.0)
-    def start(self, path=None, format=None, partitionBy=None, queryName=None, 
**options):
-        """Streams the contents of the :class:`DataFrame` to a data source.
-
-        The data source is specified by the ``format`` and a set of 
``options``.
-        If ``format`` is not specified, the default data source configured by
-        ``spark.sql.sources.default`` will be used.
-
-        .. note:: Experimental.
-
-        :param path: the path in a Hadoop supported file system
-        :param format: the format used to save
-
-            * ``append``: Append contents of this :class:`DataFrame` to 
existing data.
-            * ``overwrite``: Overwrite existing data.
-            * ``ignore``: Silently ignore this operation if data already 
exists.
-            * ``error`` (default case): Throw an exception if data already 
exists.
-        :param partitionBy: names of partitioning columns
-        :param queryName: unique name for the query
-        :param options: All other string options. You may want to provide a 
`checkpointLocation`
-            for most streams, however it is not required for a `memory` stream.
-
-        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
-        >>> sq.isActive
-        True
-        >>> sq.name
-        u'this_query'
-        >>> sq.stop()
-        >>> sq.isActive
-        False
-        >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
-        ...     queryName='that_query', format='memory')
-        >>> sq.name
-        u'that_query'
-        >>> sq.isActive
-        True
-        >>> sq.stop()
-        """
-        self.options(**options)
-        if partitionBy is not None:
-            self.partitionBy(partitionBy)
-        if format is not None:
-            self.format(format)
-        if queryName is not None:
-            self.queryName(queryName)
-        if path is None:
-            return self._sq(self._jwrite.start())
-        else:
-            return self._sq(self._jwrite.start(path))
-
-
 def _test():
     import doctest
     import os
@@ -1235,9 +747,6 @@ def _test():
     globs['sc'] = sc
     globs['spark'] = spark
     globs['df'] = 
spark.read.parquet('python/test_support/sql/parquet_partitioned')
-    globs['sdf'] = \
-        
spark.readStream.format('text').load('python/test_support/sql/streaming')
-    globs['sdf_schema'] = StructType([StructField("data", StringType(), 
False)])
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.readwriter, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | 
doctest.REPORT_NDIFF)

http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index b4152a3..55f86a1 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -31,7 +31,8 @@ from pyspark.rdd import RDD, ignore_unicode_prefix
 from pyspark.sql.catalog import Catalog
 from pyspark.sql.conf import RuntimeConfig
 from pyspark.sql.dataframe import DataFrame
-from pyspark.sql.readwriter import DataFrameReader, DataStreamReader
+from pyspark.sql.readwriter import DataFrameReader
+from pyspark.sql.streaming import DataStreamReader
 from pyspark.sql.types import Row, DataType, StringType, StructType, 
_verify_type, \
     _infer_schema, _has_nulltype, _merge_type, _create_converter, 
_parse_datatype_string
 from pyspark.sql.utils import install_exception_handler

http://git-wip-us.apache.org/repos/asf/spark/blob/f454a7f9/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ae45c99..8cf7098 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -18,15 +18,18 @@
 import sys
 if sys.version >= '3':
     intlike = int
+    basestring = unicode = str
 else:
     intlike = (int, long)
 
 from abc import ABCMeta, abstractmethod
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.readwriter import OptionUtils, to_str
+from pyspark.sql.types import *
 
-__all__ = ["StreamingQuery"]
+__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", 
"DataStreamWriter"]
 
 
 class StreamingQuery(object):
@@ -118,7 +121,7 @@ class StreamingQueryManager(object):
     def active(self):
         """Returns a list of active queries associated with this SQLContext
 
-        >>> sq = 
df.writeStream.format('memory').queryName('this_query').start()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sqm = spark.streams
         >>> # get the list of active streaming queries
         >>> [q.name for q in sqm.active]
@@ -133,7 +136,7 @@ class StreamingQueryManager(object):
         """Returns an active query from this SQLContext or throws exception if 
an active query
         with this name doesn't exist.
 
-        >>> sq = 
df.writeStream.format('memory').queryName('this_query').start()
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
         >>> sq.name
         u'this_query'
         >>> sq = spark.streams.get(sq.id)
@@ -224,6 +227,494 @@ class ProcessingTime(Trigger):
             self.interval)
 
 
+class DataStreamReader(OptionUtils):
+    """
+    Interface used to load a streaming :class:`DataFrame` from external 
storage systems
+    (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream`
+    to access this.
+
+    .. note:: Experimental.
+
+    .. versionadded:: 2.0
+    """
+
+    def __init__(self, spark):
+        self._jreader = spark._ssql_ctx.readStream()
+        self._spark = spark
+
+    def _df(self, jdf):
+        from pyspark.sql.dataframe import DataFrame
+        return DataFrame(jdf, self._spark)
+
+    @since(2.0)
+    def format(self, source):
+        """Specifies the input data source format.
+
+        .. note:: Experimental.
+
+        :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+        >>> s = spark.readStream.format("text")
+        """
+        self._jreader = self._jreader.format(source)
+        return self
+
+    @since(2.0)
+    def schema(self, schema):
+        """Specifies the input schema.
+
+        Some data sources (e.g. JSON) can infer the input schema automatically 
from data.
+        By specifying the schema here, the underlying data source can skip the 
schema
+        inference step, and thus speed up data loading.
+
+        .. note:: Experimental.
+
+        :param schema: a StructType object
+
+        >>> s = spark.readStream.schema(sdf_schema)
+        """
+        if not isinstance(schema, StructType):
+            raise TypeError("schema should be StructType")
+        jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+        self._jreader = self._jreader.schema(jschema)
+        return self
+
+    @since(2.0)
+    def option(self, key, value):
+        """Adds an input option for the underlying data source.
+
+        .. note:: Experimental.
+
+        >>> s = spark.readStream.option("x", 1)
+        """
+        self._jreader = self._jreader.option(key, to_str(value))
+        return self
+
+    @since(2.0)
+    def options(self, **options):
+        """Adds input options for the underlying data source.
+
+        .. note:: Experimental.
+
+        >>> s = spark.readStream.options(x="1", y=2)
+        """
+        for k in options:
+            self._jreader = self._jreader.option(k, to_str(options[k]))
+        return self
+
+    @since(2.0)
+    def load(self, path=None, format=None, schema=None, **options):
+        """Loads a data stream from a data source and returns it as a 
:class`DataFrame`.
+
+        .. note:: Experimental.
+
+        :param path: optional string for file-system backed data sources.
+        :param format: optional string for format of the data source. Default 
to 'parquet'.
+        :param schema: optional :class:`StructType` for the input schema.
+        :param options: all other string options
+
+        >>> json_sdf = spark.readStream.format("json")\
+                                       .schema(sdf_schema)\
+                                       
.load(os.path.join(tempfile.mkdtemp(),'data'))
+        >>> json_sdf.isStreaming
+        True
+        >>> json_sdf.schema == sdf_schema
+        True
+        """
+        if format is not None:
+            self.format(format)
+        if schema is not None:
+            self.schema(schema)
+        self.options(**options)
+        if path is not None:
+            if type(path) != str or len(path.strip()) == 0:
+                raise ValueError("If the path is provided for stream, it needs 
to be a " +
+                                 "non-empty string. List of paths are not 
supported.")
+            return self._df(self._jreader.load(path))
+        else:
+            return self._df(self._jreader.load())
+
+    @since(2.0)
+    def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
+             allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
+             allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
+             mode=None, columnNameOfCorruptRecord=None):
+        """
+        Loads a JSON file stream (one object per line) and returns a 
:class`DataFrame`.
+
+        If the ``schema`` parameter is not specified, this function goes
+        through the input once to determine the input schema.
+
+        .. note:: Experimental.
+
+        :param path: string represents path to the JSON dataset,
+                     or RDD of Strings storing JSON objects.
+        :param schema: an optional :class:`StructType` for the input schema.
+        :param primitivesAsString: infers all primitive values as a string 
type. If None is set,
+                                   it uses the default value, ``false``.
+        :param prefersDecimal: infers all floating-point values as a decimal 
type. If the values
+                               do not fit in decimal, then it infers them as 
doubles. If None is
+                               set, it uses the default value, ``false``.
+        :param allowComments: ignores Java/C++ style comment in JSON records. 
If None is set,
+                              it uses the default value, ``false``.
+        :param allowUnquotedFieldNames: allows unquoted JSON field names. If 
None is set,
+                                        it uses the default value, ``false``.
+        :param allowSingleQuotes: allows single quotes in addition to double 
quotes. If None is
+                                        set, it uses the default value, 
``true``.
+        :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 
00012). If None is
+                                        set, it uses the default value, 
``false``.
+        :param allowBackslashEscapingAnyCharacter: allows accepting quoting of 
all character
+                                                   using backslash quoting 
mechanism. If None is
+                                                   set, it uses the default 
value, ``false``.
+        :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
+                     set, it uses the default value, ``PERMISSIVE``.
+
+                *  ``PERMISSIVE`` : sets other fields to ``null`` when it 
meets a corrupted \
+                  record and puts the malformed string into a new field 
configured by \
+                 ``columnNameOfCorruptRecord``. When a schema is set by user, 
it sets \
+                 ``null`` for extra fields.
+                *  ``DROPMALFORMED`` : ignores the whole corrupted records.
+                *  ``FAILFAST`` : throws an exception when it meets corrupted 
records.
+
+        :param columnNameOfCorruptRecord: allows renaming the new field having 
malformed string
+                                          created by ``PERMISSIVE`` mode. This 
overrides
+                                          
``spark.sql.columnNameOfCorruptRecord``. If None is set,
+                                          it uses the value specified in
+                                          
``spark.sql.columnNameOfCorruptRecord``.
+
+        >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 
'data'), \
+                schema = sdf_schema)
+        >>> json_sdf.isStreaming
+        True
+        >>> json_sdf.schema == sdf_schema
+        True
+        """
+        self._set_opts(
+            schema=schema, primitivesAsString=primitivesAsString, 
prefersDecimal=prefersDecimal,
+            allowComments=allowComments, 
allowUnquotedFieldNames=allowUnquotedFieldNames,
+            allowSingleQuotes=allowSingleQuotes, 
allowNumericLeadingZero=allowNumericLeadingZero,
+            
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
+            mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord)
+        if isinstance(path, basestring):
+            return self._df(self._jreader.json(path))
+        else:
+            raise TypeError("path can be only a single string")
+
+    @since(2.0)
+    def parquet(self, path):
+        """Loads a Parquet file stream, returning the result as a 
:class:`DataFrame`.
+
+        You can set the following Parquet-specific option(s) for reading 
Parquet files:
+            * ``mergeSchema``: sets whether we should merge schemas collected 
from all \
+                Parquet part-files. This will override 
``spark.sql.parquet.mergeSchema``. \
+                The default value is specified in 
``spark.sql.parquet.mergeSchema``.
+
+        .. note:: Experimental.
+
+        >>> parquet_sdf = spark.readStream.schema(sdf_schema)\
+                .parquet(os.path.join(tempfile.mkdtemp()))
+        >>> parquet_sdf.isStreaming
+        True
+        >>> parquet_sdf.schema == sdf_schema
+        True
+        """
+        if isinstance(path, basestring):
+            return self._df(self._jreader.parquet(path))
+        else:
+            raise TypeError("path can be only a single string")
+
+    @ignore_unicode_prefix
+    @since(2.0)
+    def text(self, path):
+        """
+        Loads a text file stream and returns a :class:`DataFrame` whose schema 
starts with a
+        string column named "value", and followed by partitioned columns if 
there
+        are any.
+
+        Each line in the text file is a new row in the resulting DataFrame.
+
+        .. note:: Experimental.
+
+        :param paths: string, or list of strings, for input path(s).
+
+        >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 
'data'))
+        >>> text_sdf.isStreaming
+        True
+        >>> "value" in str(text_sdf.schema)
+        True
+        """
+        if isinstance(path, basestring):
+            return self._df(self._jreader.text(path))
+        else:
+            raise TypeError("path can be only a single string")
+
+    @since(2.0)
+    def csv(self, path, schema=None, sep=None, encoding=None, quote=None, 
escape=None,
+            comment=None, header=None, inferSchema=None, 
ignoreLeadingWhiteSpace=None,
+            ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, 
positiveInf=None,
+            negativeInf=None, dateFormat=None, maxColumns=None, 
maxCharsPerColumn=None,
+            maxMalformedLogPerPartition=None, mode=None):
+        """Loads a CSV file stream and returns the result as a  
:class:`DataFrame`.
+
+        This function will go through the input once to determine the input 
schema if
+        ``inferSchema`` is enabled. To avoid going through the entire data 
once, disable
+        ``inferSchema`` option or specify the schema explicitly using 
``schema``.
+
+        .. note:: Experimental.
+
+        :param path: string, or list of strings, for input path(s).
+        :param schema: an optional :class:`StructType` for the input schema.
+        :param sep: sets the single character as a separator for each field 
and value.
+                    If None is set, it uses the default value, ``,``.
+        :param encoding: decodes the CSV files by the given encoding type. If 
None is set,
+                         it uses the default value, ``UTF-8``.
+        :param quote: sets the single character used for escaping quoted 
values where the
+                      separator can be part of the value. If None is set, it 
uses the default
+                      value, ``"``. If you would like to turn off quotations, 
you need to set an
+                      empty string.
+        :param escape: sets the single character used for escaping quotes 
inside an already
+                       quoted value. If None is set, it uses the default 
value, ``\``.
+        :param comment: sets the single character used for skipping lines 
beginning with this
+                        character. By default (None), it is disabled.
+        :param header: uses the first line as names of columns. If None is 
set, it uses the
+                       default value, ``false``.
+        :param inferSchema: infers the input schema automatically from data. 
It requires one extra
+                       pass over the data. If None is set, it uses the default 
value, ``false``.
+        :param ignoreLeadingWhiteSpace: defines whether or not leading 
whitespaces from values
+                                        being read should be skipped. If None 
is set, it uses
+                                        the default value, ``false``.
+        :param ignoreTrailingWhiteSpace: defines whether or not trailing 
whitespaces from values
+                                         being read should be skipped. If None 
is set, it uses
+                                         the default value, ``false``.
+        :param nullValue: sets the string representation of a null value. If 
None is set, it uses
+                          the default value, empty string.
+        :param nanValue: sets the string representation of a non-number value. 
If None is set, it
+                         uses the default value, ``NaN``.
+        :param positiveInf: sets the string representation of a positive 
infinity value. If None
+                            is set, it uses the default value, ``Inf``.
+        :param negativeInf: sets the string representation of a negative 
infinity value. If None
+                            is set, it uses the default value, ``Inf``.
+        :param dateFormat: sets the string that indicates a date format. 
Custom date formats
+                           follow the formats at 
``java.text.SimpleDateFormat``. This
+                           applies to both date type and timestamp type. By 
default, it is None
+                           which means trying to parse times and date by
+                           ``java.sql.Timestamp.valueOf()`` and 
``java.sql.Date.valueOf()``.
+        :param maxColumns: defines a hard limit of how many columns a record 
can have. If None is
+                           set, it uses the default value, ``20480``.
+        :param maxCharsPerColumn: defines the maximum number of characters 
allowed for any given
+                                  value being read. If None is set, it uses 
the default value,
+                                  ``1000000``.
+        :param mode: allows a mode for dealing with corrupt records during 
parsing. If None is
+                     set, it uses the default value, ``PERMISSIVE``.
+
+                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets 
a corrupted record.
+                    When a schema is set by user, it sets ``null`` for extra 
fields.
+                * ``DROPMALFORMED`` : ignores the whole corrupted records.
+                * ``FAILFAST`` : throws an exception when it meets corrupted 
records.
+
+        >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 
'data'), \
+                schema = sdf_schema)
+        >>> csv_sdf.isStreaming
+        True
+        >>> csv_sdf.schema == sdf_schema
+        True
+        """
+        self._set_opts(
+            schema=schema, sep=sep, encoding=encoding, quote=quote, 
escape=escape, comment=comment,
+            header=header, inferSchema=inferSchema, 
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
+            ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, 
nullValue=nullValue,
+            nanValue=nanValue, positiveInf=positiveInf, 
negativeInf=negativeInf,
+            dateFormat=dateFormat, maxColumns=maxColumns, 
maxCharsPerColumn=maxCharsPerColumn,
+            maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode)
+        if isinstance(path, basestring):
+            return self._df(self._jreader.csv(path))
+        else:
+            raise TypeError("path can be only a single string")
+
+
+class DataStreamWriter(object):
+    """
+    Interface used to write a streaming :class:`DataFrame` to external storage 
systems
+    (e.g. file systems, key-value stores, etc). Use 
:func:`DataFrame.writeStream`
+    to access this.
+
+    .. note:: Experimental.
+
+    .. versionadded:: 2.0
+    """
+
+    def __init__(self, df):
+        self._df = df
+        self._spark = df.sql_ctx
+        self._jwrite = df._jdf.writeStream()
+
+    def _sq(self, jsq):
+        from pyspark.sql.streaming import StreamingQuery
+        return StreamingQuery(jsq)
+
+    @since(2.0)
+    def outputMode(self, outputMode):
+        """Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+
+        Options include:
+
+        * `append`:Only the new rows in the streaming DataFrame/Dataset will 
be written to
+           the sink
+        * `complete`:All the rows in the streaming DataFrame/Dataset will be 
written to the sink
+           every time these is some updates
+
+       .. note:: Experimental.
+
+        >>> writer = sdf.writeStream.outputMode('append')
+        """
+        if not outputMode or type(outputMode) != str or 
len(outputMode.strip()) == 0:
+            raise ValueError('The output mode must be a non-empty string. Got: 
%s' % outputMode)
+        self._jwrite = self._jwrite.outputMode(outputMode)
+        return self
+
+    @since(2.0)
+    def format(self, source):
+        """Specifies the underlying output data source.
+
+        .. note:: Experimental.
+
+        :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+        >>> writer = sdf.writeStream.format('json')
+        """
+        self._jwrite = self._jwrite.format(source)
+        return self
+
+    @since(2.0)
+    def option(self, key, value):
+        """Adds an output option for the underlying data source.
+
+        .. note:: Experimental.
+        """
+        self._jwrite = self._jwrite.option(key, to_str(value))
+        return self
+
+    @since(2.0)
+    def options(self, **options):
+        """Adds output options for the underlying data source.
+
+       .. note:: Experimental.
+        """
+        for k in options:
+            self._jwrite = self._jwrite.option(k, to_str(options[k]))
+        return self
+
+    @since(2.0)
+    def partitionBy(self, *cols):
+        """Partitions the output by the given columns on the file system.
+
+        If specified, the output is laid out on the file system similar
+        to Hive's partitioning scheme.
+
+        .. note:: Experimental.
+
+        :param cols: name of columns
+
+        """
+        if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
+            cols = cols[0]
+        self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols))
+        return self
+
+    @since(2.0)
+    def queryName(self, queryName):
+        """Specifies the name of the :class:`StreamingQuery` that can be 
started with
+        :func:`start`. This name must be unique among all the currently active 
queries
+        in the associated SparkSession.
+
+        .. note:: Experimental.
+
+        :param queryName: unique name for the query
+
+        >>> writer = sdf.writeStream.queryName('streaming_query')
+        """
+        if not queryName or type(queryName) != str or len(queryName.strip()) 
== 0:
+            raise ValueError('The queryName must be a non-empty string. Got: 
%s' % queryName)
+        self._jwrite = self._jwrite.queryName(queryName)
+        return self
+
+    @keyword_only
+    @since(2.0)
+    def trigger(self, processingTime=None):
+        """Set the trigger for the stream query. If this is not set it will 
run the query as fast
+        as possible, which is equivalent to setting the trigger to 
``processingTime='0 seconds'``.
+
+        .. note:: Experimental.
+
+        :param processingTime: a processing time interval as a string, e.g. '5 
seconds', '1 minute'.
+
+        >>> # trigger the query for execution every 5 seconds
+        >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
+        """
+        from pyspark.sql.streaming import ProcessingTime
+        trigger = None
+        if processingTime is not None:
+            if type(processingTime) != str or len(processingTime.strip()) == 0:
+                raise ValueError('The processing time must be a non empty 
string. Got: %s' %
+                                 processingTime)
+            trigger = ProcessingTime(processingTime)
+        if trigger is None:
+            raise ValueError('A trigger was not provided. Supported triggers: 
processingTime.')
+        self._jwrite = 
self._jwrite.trigger(trigger._to_java_trigger(self._spark))
+        return self
+
+    @ignore_unicode_prefix
+    @since(2.0)
+    def start(self, path=None, format=None, partitionBy=None, queryName=None, 
**options):
+        """Streams the contents of the :class:`DataFrame` to a data source.
+
+        The data source is specified by the ``format`` and a set of 
``options``.
+        If ``format`` is not specified, the default data source configured by
+        ``spark.sql.sources.default`` will be used.
+
+        .. note:: Experimental.
+
+        :param path: the path in a Hadoop supported file system
+        :param format: the format used to save
+
+            * ``append``: Append contents of this :class:`DataFrame` to 
existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already 
exists.
+            * ``error`` (default case): Throw an exception if data already 
exists.
+        :param partitionBy: names of partitioning columns
+        :param queryName: unique name for the query
+        :param options: All other string options. You may want to provide a 
`checkpointLocation`
+            for most streams, however it is not required for a `memory` stream.
+
+        >>> sq = 
sdf.writeStream.format('memory').queryName('this_query').start()
+        >>> sq.isActive
+        True
+        >>> sq.name
+        u'this_query'
+        >>> sq.stop()
+        >>> sq.isActive
+        False
+        >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
+        ...     queryName='that_query', format='memory')
+        >>> sq.name
+        u'that_query'
+        >>> sq.isActive
+        True
+        >>> sq.stop()
+        """
+        self.options(**options)
+        if partitionBy is not None:
+            self.partitionBy(partitionBy)
+        if format is not None:
+            self.format(format)
+        if queryName is not None:
+            self.queryName(queryName)
+        if path is None:
+            return self._sq(self._jwrite.start())
+        else:
+            return self._sq(self._jwrite.start(path))
+
+
 def _test():
     import doctest
     import os
@@ -243,6 +734,9 @@ def _test():
     globs['os'] = os
     globs['spark'] = spark
     globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
+    globs['sdf'] = \
+        
spark.readStream.format('text').load('python/test_support/sql/streaming')
+    globs['sdf_schema'] = StructType([StructField("data", StringType(), 
False)])
     globs['df'] = \
         
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to