[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67086340
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with 
BeforeAndAfterAll {
 
   import testImplicits._
 
+
--- End diff --

Done in #13673 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67086347
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, 
properties=None):
 self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
+class DataStreamReader(object):
--- End diff --

Done in #13673


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13653


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67082435
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, 
properties=None):
 self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
+class DataStreamReader(object):
--- End diff --

I will fix this in the follow up pr #13673 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67080464
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, 
properties=None):
 self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
+class DataStreamReader(object):
+"""
+Interface used to load a streaming :class:`DataFrame` from external 
storage systems
+(e.g. file systems, key-value stores, etc). Use 
:func:`spark.readStream`
+to access this.
+
+.. note:: Experimental.
+
+.. versionadded:: 2.0
+"""
+
+def __init__(self, spark):
+self._jreader = spark._ssql_ctx.readStream()
+self._spark = spark
+
+def _df(self, jdf):
+from pyspark.sql.dataframe import DataFrame
+return DataFrame(jdf, self._spark)
+
+@since(2.0)
+def format(self, source):
+"""Specifies the input data source format.
+
+.. note:: Experimental.
+
+:param source: string, name of the data source, e.g. 'json', 
'parquet'.
+
+"""
+self._jreader = self._jreader.format(source)
+return self
+
+@since(2.0)
+def schema(self, schema):
+"""Specifies the input schema.
+
+Some data sources (e.g. JSON) can infer the input schema 
automatically from data.
+By specifying the schema here, the underlying data source can skip 
the schema
+inference step, and thus speed up data loading.
+
+.. note:: Experimental.
+
+:param schema: a StructType object
+"""
+if not isinstance(schema, StructType):
+raise TypeError("schema should be StructType")
+jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+self._jreader = self._jreader.schema(jschema)
+return self
+
+@since(2.0)
+def option(self, key, value):
+"""Adds an input option for the underlying data source.
+
+.. note:: Experimental.
+"""
+self._jreader = self._jreader.option(key, to_str(value))
+return self
+
+@since(2.0)
+def options(self, **options):
+"""Adds input options for the underlying data source.
+
+.. note:: Experimental.
+"""
+for k in options:
+self._jreader = self._jreader.option(k, to_str(options[k]))
+return self
+
+@since(2.0)
+def load(self, path=None, format=None, schema=None, **options):
+"""Loads a data stream from a data source and returns it as a 
:class`DataFrame`.
+
+.. note:: Experimental.
+
+:param path: optional string for file-system backed data sources.
+:param format: optional string for format of the data source. 
Default to 'parquet'.
+:param schema: optional :class:`StructType` for the input schema.
+:param options: all other string options
+
+"""
+if format is not None:
+self.format(format)
+if schema is not None:
+self.schema(schema)
+self.options(**options)
+if path is not None:
+if type(path) != str or len(path.strip()) == 0:
+raise ValueError("If the path is provided for stream, it 
needs to be a " +
+ "non-empty string. List of paths are not 
supported.")
+return self._df(self._jreader.load(path))
+else:
+return self._df(self._jreader.load())
+
+@since(2.0)
+def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
+ allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+"""
+Loads a JSON file stream (one object per line) and returns a 
:class`DataFrame`.
+
+If the ``schema`` parameter is not specified, this function goes
+through the input once to determine the input schema.
+
+.. note:: Experimental.
+
+:param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+:param schema: an optional :class:`StructType` for the input 
schema.
+:param primitivesAsString: infers all primitive values as a string 
type. If None is set,
+   it uses the default value, ``false``.
+:param prefersDecimal: infers all floating-point values 

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67079701
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, 
properties=None):
 self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
+class DataStreamReader(object):
+"""
+Interface used to load a streaming :class:`DataFrame` from external 
storage systems
+(e.g. file systems, key-value stores, etc). Use 
:func:`spark.readStream`
+to access this.
+
+.. note:: Experimental.
+
+.. versionadded:: 2.0
+"""
+
+def __init__(self, spark):
+self._jreader = spark._ssql_ctx.readStream()
+self._spark = spark
+
+def _df(self, jdf):
+from pyspark.sql.dataframe import DataFrame
+return DataFrame(jdf, self._spark)
+
+@since(2.0)
+def format(self, source):
+"""Specifies the input data source format.
+
+.. note:: Experimental.
+
+:param source: string, name of the data source, e.g. 'json', 
'parquet'.
+
+"""
+self._jreader = self._jreader.format(source)
+return self
+
+@since(2.0)
+def schema(self, schema):
+"""Specifies the input schema.
+
+Some data sources (e.g. JSON) can infer the input schema 
automatically from data.
+By specifying the schema here, the underlying data source can skip 
the schema
+inference step, and thus speed up data loading.
+
+.. note:: Experimental.
+
+:param schema: a StructType object
+"""
+if not isinstance(schema, StructType):
+raise TypeError("schema should be StructType")
+jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+self._jreader = self._jreader.schema(jschema)
+return self
+
+@since(2.0)
+def option(self, key, value):
+"""Adds an input option for the underlying data source.
+
+.. note:: Experimental.
+"""
+self._jreader = self._jreader.option(key, to_str(value))
+return self
+
+@since(2.0)
+def options(self, **options):
+"""Adds input options for the underlying data source.
+
+.. note:: Experimental.
+"""
+for k in options:
+self._jreader = self._jreader.option(k, to_str(options[k]))
+return self
+
+@since(2.0)
+def load(self, path=None, format=None, schema=None, **options):
+"""Loads a data stream from a data source and returns it as a 
:class`DataFrame`.
+
+.. note:: Experimental.
+
+:param path: optional string for file-system backed data sources.
+:param format: optional string for format of the data source. 
Default to 'parquet'.
+:param schema: optional :class:`StructType` for the input schema.
+:param options: all other string options
+
+"""
+if format is not None:
+self.format(format)
+if schema is not None:
+self.schema(schema)
+self.options(**options)
+if path is not None:
+if type(path) != str or len(path.strip()) == 0:
+raise ValueError("If the path is provided for stream, it 
needs to be a " +
+ "non-empty string. List of paths are not 
supported.")
+return self._df(self._jreader.load(path))
+else:
+return self._df(self._jreader.load())
+
+@since(2.0)
+def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
+ allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+"""
+Loads a JSON file stream (one object per line) and returns a 
:class`DataFrame`.
+
+If the ``schema`` parameter is not specified, this function goes
+through the input once to determine the input schema.
+
+.. note:: Experimental.
+
+:param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+:param schema: an optional :class:`StructType` for the input 
schema.
+:param primitivesAsString: infers all primitive values as a string 
type. If None is set,
+   it uses the default value, ``false``.
+:param prefersDecimal: infers all floating-point valu

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67078670
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ---
@@ -40,6 +40,8 @@ class StreamingAggregationSuite extends StreamTest with 
BeforeAndAfterAll {
 
   import testImplicits._
 
+
--- End diff --

nit: redundant spaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67077331
  
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None, 
properties=None):
 self._jwrite.mode(mode).jdbc(url, table, jprop)
 
 
+class DataStreamReader(object):
--- End diff --

nit: add new classes to `__all__ = ["DataFrameReader", "DataFrameWriter"]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67058676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output dat

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67042657
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output dat

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67029686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67029246
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output dat

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67023372
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67023216
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67023075
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
--- End diff --

Again, I'd probably go for consistency unless we think that we will need 
save modes in streaming (i'm not really sure how that would work).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-14 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r67022595
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+@Experimental
+final class DataStreamReader private[sql](sparkSession: SparkSession) 
extends Logging {
+  /**
+   * :: Experimental ::
+   * Specifies the input data source format.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def format(source: String): DataStreamReader = {
+this.source = source
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer 
the input schema
+   * automatically from data. By specifying the schema here, the 
underlying data source can
+   * skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def schema(schema: StructType): DataStreamReader = {
+this.userSpecifiedSchema = Option(schema)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: String): DataStreamReader = {
+this.extraOptions += (key -> value)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: scala.collection.Map[String, String]): 
DataStreamReader = {
+this.extraOptions ++= options
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds input options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+this.options(options.asScala)
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Loads input data stream in as a [[DataFrame]], for data streams that 
don't require a path
+   * (e.g. external key-value stores).
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def load(): DataFrame = {
--- End diff --

Seems best to keep it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888549
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.test
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+
+object LastOptions {
+
+  var parameters: Map[String, String] = null
+  var schema: Option[StructType] = null
+  var saveMode: SaveMode = null
+
+  def clear(): Unit = {
+parameters = null
+schema = null
+saveMode = null
+  }
+}
+
+
+/** Dummy provider. */
+class DefaultSource
+  extends RelationProvider
+  with SchemaRelationProvider
+  with CreatableRelationProvider {
+
+  case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
+override def schema: StructType = StructType(Seq(StructField("a", 
StringType)))
+  }
+
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String],
+  schema: StructType
+): BaseRelation = {
+LastOptions.parameters = parameters
+LastOptions.schema = Some(schema)
+FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String]
+): BaseRelation = {
+LastOptions.parameters = parameters
+LastOptions.schema = None
+FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+  sqlContext: SQLContext,
+  mode: SaveMode,
+  parameters: Map[String, String],
+  data: DataFrame): BaseRelation = {
+LastOptions.parameters = parameters
+LastOptions.schema = None
+LastOptions.saveMode = mode
+FakeRelation(sqlContext)
+  }
+}
+
+
+class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext {
--- End diff --

These tests are very rudimentary set of tests to test the DataFrameReader 
code path. Note that in Spark 1.6, there were no tests at all. Also I believe 
that a lot of the DF functionality is tested through other test suites (e.g. 
partitioning columns is tested through PartitionedParquetSuite)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888413
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 ---
@@ -371,76 +384,12 @@ class DataFrameReaderWriterSuite extends StreamTest 
with BeforeAndAfter {
 
   private def newTextInput = Utils.createTempDir(namePrefix = 
"text").getCanonicalPath
 
-  test("check trigger() can only be called on continuous queries") {
--- End diff --

Most of these tests are to check whether one method is called on the wrong 
type of DF or not. So all of these can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888223
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
--- End diff --

should this be shortened to just `mode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output dat

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   *
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * Java Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   *
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def trigger(trigger: Trigger): DataStreamWriter[T] = {
+this.trigger = trigger
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies the name of the [[ContinuousQuery]] that can be started 
with `startStream()`.
+   * This name must be unique among all the currently active queries in 
the associated SQLContext.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def queryName(queryName: String): DataStreamWriter[T] = {
+this.extraOptions += ("queryName" -> queryName)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the underlying output dat

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66888017
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
---
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
ForeachWriter}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, 
MemorySink}
+
+/**
+ * :: Experimental ::
+ * Interface used to write a streaming [[Dataset]] to external storage 
systems (e.g. file systems,
+ * key-value stores, etc). Use [[Dataset.write]] to access this.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
+
+  private val df = ds.toDF()
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `OutputMode.Append()`: only the new rows in the streaming 
DataFrame/Dataset will be
+   *written to the sink
+   *   - `OutputMode.Complete()`: all the rows in the streaming 
DataFrame/Dataset will be written
+   *  to the sink every time these is some 
updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
+this.outputMode = outputMode
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Specifies how data of a streaming DataFrame/Dataset is written to a 
streaming sink.
+   *   - `append`:   only the new rows in the streaming DataFrame/Dataset 
will be written to
+   * the sink
+   *   - `complete`: all the rows in the streaming DataFrame/Dataset will 
be written to the sink
+   * every time these is some updates
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def outputMode(outputMode: String): DataStreamWriter[T] = {
+this.outputMode = outputMode.toLowerCase match {
+  case "append" =>
+OutputMode.Append
+  case "complete" =>
+OutputMode.Complete
+  case _ =>
+throw new IllegalArgumentException(s"Unknown output mode 
$outputMode. " +
+  "Accepted output modes are 'append' and 'complete'")
+}
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Set the trigger for the stream query. The default value is 
`ProcessingTime(0)` and it will run
+   * the query as fast as possible.
+   *
+   * Scala Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
--- End diff --

update these examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66887929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+@Experimental
+final class DataStreamReader private[sql](sparkSession: SparkSession) 
extends Logging {
+  /**
+   * :: Experimental ::
+   * Specifies the input data source format.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def format(source: String): DataStreamReader = {
+this.source = source
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer 
the input schema
+   * automatically from data. By specifying the schema here, the 
underlying data source can
+   * skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def schema(schema: StructType): DataStreamReader = {
+this.userSpecifiedSchema = Option(schema)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: String): DataStreamReader = {
+this.extraOptions += (key -> value)
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Boolean): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Long): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * Adds an input option for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def option(key: String, value: Double): DataStreamReader = option(key, 
value.toString)
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: scala.collection.Map[String, String]): 
DataStreamReader = {
+this.extraOptions ++= options
+this
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds input options for the underlying data source.
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def options(options: java.util.Map[String, String]): DataStreamReader = {
+this.options(options.asScala)
+this
+  }
+
+
+  /**
+   * :: Experimental ::
+   * Loads input data stream in as a [[DataFrame]], for data streams that 
don't require a path
+   * (e.g. external key-value stores).
+   *
+   * @since 2.0.0
+   */
+  @Experimental
+  def load(): DataFrame = {
--- End diff --

@marmbrus @rxin Should this be `load()`? In the general case, 
```
sdf.readStream
.format("myFormat")
.load("stringIdentifier")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure

[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13653#discussion_r66887779
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
---
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+@Experimental
--- End diff --

add docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13653: [SPARK-15933][SQL][STREAMING] Refactored DF reade...

2016-06-13 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/13653

[SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream 
and writeStream for streaming DFs

## What changes were proposed in this pull request?
Currently, the DataFrameReader/Writer has method that are needed for 
streaming and non-streaming DFs. This is quite awkward because each method in 
them through runtime exception for one case or the other. So rather having half 
the methods throw runtime exceptions, its just better to have a different 
reader/writer API for streams.

## How was this patch tested?
Existing unit tests + two sets of unit tests for DataFrameReader/Writer and 
DataStreamReader/Writer.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-15933

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13653






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org