[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67086340 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { import testImplicits._ + --- End diff -- Done in #13673 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67086347 --- Diff: python/pyspark/sql/readwriter.py --- @@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, properties=None): self._jwrite.mode(mode).jdbc(url, table, jprop) +class DataStreamReader(object): --- End diff -- Done in #13673 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/13653 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67082435 --- Diff: python/pyspark/sql/readwriter.py --- @@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, properties=None): self._jwrite.mode(mode).jdbc(url, table, jprop) +class DataStreamReader(object): --- End diff -- I will fix this in the follow up pr #13673 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67080464 --- Diff: python/pyspark/sql/readwriter.py --- @@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, properties=None): self._jwrite.mode(mode).jdbc(url, table, jprop) +class DataStreamReader(object): +""" +Interface used to load a streaming :class:`DataFrame` from external storage systems +(e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` +to access this. + +.. note:: Experimental. + +.. versionadded:: 2.0 +""" + +def __init__(self, spark): +self._jreader = spark._ssql_ctx.readStream() +self._spark = spark + +def _df(self, jdf): +from pyspark.sql.dataframe import DataFrame +return DataFrame(jdf, self._spark) + +@since(2.0) +def format(self, source): +"""Specifies the input data source format. + +.. note:: Experimental. + +:param source: string, name of the data source, e.g. 'json', 'parquet'. + +""" +self._jreader = self._jreader.format(source) +return self + +@since(2.0) +def schema(self, schema): +"""Specifies the input schema. + +Some data sources (e.g. JSON) can infer the input schema automatically from data. +By specifying the schema here, the underlying data source can skip the schema +inference step, and thus speed up data loading. + +.. note:: Experimental. + +:param schema: a StructType object +""" +if not isinstance(schema, StructType): +raise TypeError("schema should be StructType") +jschema = self._spark._ssql_ctx.parseDataType(schema.json()) +self._jreader = self._jreader.schema(jschema) +return self + +@since(2.0) +def option(self, key, value): +"""Adds an input option for the underlying data source. + +.. note:: Experimental. +""" +self._jreader = self._jreader.option(key, to_str(value)) +return self + +@since(2.0) +def options(self, **options): +"""Adds input options for the underlying data source. + +.. note:: Experimental. +""" +for k in options: +self._jreader = self._jreader.option(k, to_str(options[k])) +return self + +@since(2.0) +def load(self, path=None, format=None, schema=None, **options): +"""Loads a data stream from a data source and returns it as a :class`DataFrame`. + +.. note:: Experimental. + +:param path: optional string for file-system backed data sources. +:param format: optional string for format of the data source. Default to 'parquet'. +:param schema: optional :class:`StructType` for the input schema. +:param options: all other string options + +""" +if format is not None: +self.format(format) +if schema is not None: +self.schema(schema) +self.options(**options) +if path is not None: +if type(path) != str or len(path.strip()) == 0: +raise ValueError("If the path is provided for stream, it needs to be a " + + "non-empty string. List of paths are not supported.") +return self._df(self._jreader.load(path)) +else: +return self._df(self._jreader.load()) + +@since(2.0) +def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, + allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, + allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, + mode=None, columnNameOfCorruptRecord=None): +""" +Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. + +If the ``schema`` parameter is not specified, this function goes +through the input once to determine the input schema. + +.. note:: Experimental. + +:param path: string represents path to the JSON dataset, + or RDD of Strings storing JSON objects. +:param schema: an optional :class:`StructType` for the input schema. +:param primitivesAsString: infers all primitive values as a string type. If None is set, + it uses the default value, ``false``. +:param prefersDecimal: infers all floating-point values
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67079701 --- Diff: python/pyspark/sql/readwriter.py --- @@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, properties=None): self._jwrite.mode(mode).jdbc(url, table, jprop) +class DataStreamReader(object): +""" +Interface used to load a streaming :class:`DataFrame` from external storage systems +(e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` +to access this. + +.. note:: Experimental. + +.. versionadded:: 2.0 +""" + +def __init__(self, spark): +self._jreader = spark._ssql_ctx.readStream() +self._spark = spark + +def _df(self, jdf): +from pyspark.sql.dataframe import DataFrame +return DataFrame(jdf, self._spark) + +@since(2.0) +def format(self, source): +"""Specifies the input data source format. + +.. note:: Experimental. + +:param source: string, name of the data source, e.g. 'json', 'parquet'. + +""" +self._jreader = self._jreader.format(source) +return self + +@since(2.0) +def schema(self, schema): +"""Specifies the input schema. + +Some data sources (e.g. JSON) can infer the input schema automatically from data. +By specifying the schema here, the underlying data source can skip the schema +inference step, and thus speed up data loading. + +.. note:: Experimental. + +:param schema: a StructType object +""" +if not isinstance(schema, StructType): +raise TypeError("schema should be StructType") +jschema = self._spark._ssql_ctx.parseDataType(schema.json()) +self._jreader = self._jreader.schema(jschema) +return self + +@since(2.0) +def option(self, key, value): +"""Adds an input option for the underlying data source. + +.. note:: Experimental. +""" +self._jreader = self._jreader.option(key, to_str(value)) +return self + +@since(2.0) +def options(self, **options): +"""Adds input options for the underlying data source. + +.. note:: Experimental. +""" +for k in options: +self._jreader = self._jreader.option(k, to_str(options[k])) +return self + +@since(2.0) +def load(self, path=None, format=None, schema=None, **options): +"""Loads a data stream from a data source and returns it as a :class`DataFrame`. + +.. note:: Experimental. + +:param path: optional string for file-system backed data sources. +:param format: optional string for format of the data source. Default to 'parquet'. +:param schema: optional :class:`StructType` for the input schema. +:param options: all other string options + +""" +if format is not None: +self.format(format) +if schema is not None: +self.schema(schema) +self.options(**options) +if path is not None: +if type(path) != str or len(path.strip()) == 0: +raise ValueError("If the path is provided for stream, it needs to be a " + + "non-empty string. List of paths are not supported.") +return self._df(self._jreader.load(path)) +else: +return self._df(self._jreader.load()) + +@since(2.0) +def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, + allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, + allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, + mode=None, columnNameOfCorruptRecord=None): +""" +Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. + +If the ``schema`` parameter is not specified, this function goes +through the input once to determine the input schema. + +.. note:: Experimental. + +:param path: string represents path to the JSON dataset, + or RDD of Strings storing JSON objects. +:param schema: an optional :class:`StructType` for the input schema. +:param primitivesAsString: infers all primitive values as a string type. If None is set, + it uses the default value, ``false``. +:param prefersDecimal: infers all floating-point valu
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67078670 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala --- @@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { import testImplicits._ + --- End diff -- nit: redundant spaces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67077331 --- Diff: python/pyspark/sql/readwriter.py --- @@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, properties=None): self._jwrite.mode(mode).jdbc(url, table, jprop) +class DataStreamReader(object): --- End diff -- nit: add new classes to `__all__ = ["DataFrameReader", "DataFrameWriter"]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67058676 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output dat
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67042657 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output dat
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67029686 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67029246 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output dat
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67023372 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67023216 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67023075 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { --- End diff -- Again, I'd probably go for consistency unless we think that we will need save modes in streaming (i'm not really sure how that would work). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r67022595 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.types.StructType + +@Experimental +final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging { + /** + * :: Experimental :: + * Specifies the input data source format. + * + * @since 2.0.0 + */ + @Experimental + def format(source: String): DataStreamReader = { +this.source = source +this + } + + /** + * :: Experimental :: + * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + * automatically from data. By specifying the schema here, the underlying data source can + * skip the schema inference step, and thus speed up data loading. + * + * @since 2.0.0 + */ + @Experimental + def schema(schema: StructType): DataStreamReader = { +this.userSpecifiedSchema = Option(schema) +this + } + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: String): DataStreamReader = { +this.extraOptions += (key -> value) +this + } + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Long): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Double): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * (Scala-specific) Adds input options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: scala.collection.Map[String, String]): DataStreamReader = { +this.extraOptions ++= options +this + } + + /** + * :: Experimental :: + * Adds input options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: java.util.Map[String, String]): DataStreamReader = { +this.options(options.asScala) +this + } + + + /** + * :: Experimental :: + * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path + * (e.g. external key-value stores). + * + * @since 2.0.0 + */ + @Experimental + def load(): DataFrame = { --- End diff -- Seems best to keep it consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- -
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.test + +import org.apache.spark.sql._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.util.Utils + + +object LastOptions { + + var parameters: Map[String, String] = null + var schema: Option[StructType] = null + var saveMode: SaveMode = null + + def clear(): Unit = { +parameters = null +schema = null +saveMode = null + } +} + + +/** Dummy provider. */ +class DefaultSource + extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider { + + case class FakeRelation(sqlContext: SQLContext) extends BaseRelation { +override def schema: StructType = StructType(Seq(StructField("a", StringType))) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType +): BaseRelation = { +LastOptions.parameters = parameters +LastOptions.schema = Some(schema) +FakeRelation(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String] +): BaseRelation = { +LastOptions.parameters = parameters +LastOptions.schema = None +FakeRelation(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { +LastOptions.parameters = parameters +LastOptions.schema = None +LastOptions.saveMode = mode +FakeRelation(sqlContext) + } +} + + +class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { --- End diff -- These tests are very rudimentary set of tests to test the DataFrameReader code path. Note that in Spark 1.6, there were no tests at all. Also I believe that a lot of the DF functionality is tested through other test suites (e.g. partitioning columns is tested through PartitionedParquetSuite) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888413 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala --- @@ -371,76 +384,12 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath - test("check trigger() can only be called on continuous queries") { --- End diff -- Most of these tests are to check whether one method is called on the wrong type of DF or not. So all of these can be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888223 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { --- End diff -- should this be shortened to just `mode`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output dat
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + @Experimental + def trigger(trigger: Trigger): DataStreamWriter[T] = { +this.trigger = trigger +this + } + + + /** + * :: Experimental :: + * Specifies the name of the [[ContinuousQuery]] that can be started with `startStream()`. + * This name must be unique among all the currently active queries in the associated SQLContext. + * + * @since 2.0.0 + */ + @Experimental + def queryName(queryName: String): DataStreamWriter[T] = { +this.extraOptions += ("queryName" -> queryName) +this + } + + /** + * :: Experimental :: + * Specifies the underlying output dat
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66888017 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} + +/** + * :: Experimental :: + * Interface used to write a streaming [[Dataset]] to external storage systems (e.g. file systems, + * key-value stores, etc). Use [[Dataset.write]] to access this. + * + * @since 2.0.0 + */ +@Experimental +final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { + + private val df = ds.toDF() + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be + *written to the sink + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: OutputMode): DataStreamWriter[T] = { +this.outputMode = outputMode +this + } + + + /** + * :: Experimental :: + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to + * the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink + * every time these is some updates + * + * @since 2.0.0 + */ + @Experimental + def outputMode(outputMode: String): DataStreamWriter[T] = { +this.outputMode = outputMode.toLowerCase match { + case "append" => +OutputMode.Append + case "complete" => +OutputMode.Complete + case _ => +throw new IllegalArgumentException(s"Unknown output mode $outputMode. " + + "Accepted output modes are 'append' and 'complete'") +} +this + } + + /** + * :: Experimental :: + * Set the trigger for the stream query. The default value is `ProcessingTime(0)` and it will run + * the query as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) --- End diff -- update these examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66887929 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.types.StructType + +@Experimental +final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging { + /** + * :: Experimental :: + * Specifies the input data source format. + * + * @since 2.0.0 + */ + @Experimental + def format(source: String): DataStreamReader = { +this.source = source +this + } + + /** + * :: Experimental :: + * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + * automatically from data. By specifying the schema here, the underlying data source can + * skip the schema inference step, and thus speed up data loading. + * + * @since 2.0.0 + */ + @Experimental + def schema(schema: StructType): DataStreamReader = { +this.userSpecifiedSchema = Option(schema) +this + } + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: String): DataStreamReader = { +this.extraOptions += (key -> value) +this + } + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Boolean): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Long): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * Adds an input option for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def option(key: String, value: Double): DataStreamReader = option(key, value.toString) + + /** + * :: Experimental :: + * (Scala-specific) Adds input options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: scala.collection.Map[String, String]): DataStreamReader = { +this.extraOptions ++= options +this + } + + /** + * :: Experimental :: + * Adds input options for the underlying data source. + * + * @since 2.0.0 + */ + @Experimental + def options(options: java.util.Map[String, String]): DataStreamReader = { +this.options(options.asScala) +this + } + + + /** + * :: Experimental :: + * Loads input data stream in as a [[DataFrame]], for data streams that don't require a path + * (e.g. external key-value stores). + * + * @since 2.0.0 + */ + @Experimental + def load(): DataFrame = { --- End diff -- @marmbrus @rxin Should this be `load()`? In the general case, ``` sdf.readStream .format("myFormat") .load("stringIdentifier") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/13653#discussion_r66887779 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala --- @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.types.StructType + +@Experimental --- End diff -- add docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/13653 [SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream and writeStream for streaming DFs ## What changes were proposed in this pull request? Currently, the DataFrameReader/Writer has method that are needed for streaming and non-streaming DFs. This is quite awkward because each method in them through runtime exception for one case or the other. So rather having half the methods throw runtime exceptions, its just better to have a different reader/writer API for streams. ## How was this patch tested? Existing unit tests + two sets of unit tests for DataFrameReader/Writer and DataStreamReader/Writer. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-15933 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13653 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org