Hi Arun,

Regarding parquet and complete output mode:

A relevant piece of the code to think about:

        if (outputMode != OutputMode.Append) {
          throw new IllegalArgumentException(
            s"Data source $className does not support $outputMode output mode")
        }

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L267-L270

It says that only Append output mode is supported. The message
could've been more precise in this case (but since it's an alpha API
I'd not recommend changing it anyway).

Regarding aggregations for parquet (and other Append-output sinks)

Here is a relevant piece of the code:

    outputMode match {
      case InternalOutputModes.Append if aggregates.nonEmpty =>
        throwError(
          s"$outputMode output mode not supported when there are
streaming aggregations on " +
            s"streaming DataFrames/DataSets")(plan)

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L56-L60

It says that for append output mode you can have no aggregates in a
streaming pipeline.

To me it says that parquet can be append output mode only with no aggregates.

Kudos for letting me know about it!

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 1:19 AM, Arun Patel <arunp.bigd...@gmail.com> wrote:
> Thanks for the response. However, I am not able to use any output mode.  In
> case of Parquet sink, there should not be any aggregations?
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> java.lang.IllegalArgumentException: Data source parquet does not support
> Complete output mode
>   at
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
>   ... 54 elided
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("append").start()
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:60)
>   at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
>   ... 54 elided
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> java.lang.IllegalArgumentException: Data source parquet does not support
> Complete output mode
>   at
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
>   ... 54 elided
>
>
> On Sat, Jul 30, 2016 at 5:59 PM, Tathagata Das <t...@databricks.com> wrote:
>>
>> Correction, the two options are.
>>
>> - writeStream.format("parquet").option("path", "...").start()
>> - writestream.parquet("...").start()
>>
>> There no start with param.
>>
>>
>> On Jul 30, 2016 11:22 AM, "Jacek Laskowski" <ja...@japila.pl> wrote:
>>>
>>> Hi Arun,
>>>
>>> > As per documentation, parquet is the only available file sink.
>>>
>>> The following sinks are currently available in Spark:
>>>
>>> * ConsoleSink for console format.
>>> * FileStreamSink for parquet format.
>>> * ForeachSink used in foreach operator.
>>> * MemorySink for memory format.
>>>
>>> You can create your own streaming format implementing StreamSinkProvider.
>>>
>>> > I am getting an error like 'path' is not specified.
>>> > Any idea how to write this to parquet file?
>>>
>>> There are two ways to specify "path":
>>>
>>> 1. Using option method
>>> 2. start(path: String): StreamingQuery
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sat, Jul 30, 2016 at 2:50 PM, Arun Patel <arunp.bigd...@gmail.com>
>>> wrote:
>>> > I am trying out Structured streaming parquet sink.  As per
>>> > documentation,
>>> > parquet is the only available file sink.
>>> >
>>> > I am getting an error like 'path' is not specified.
>>> >
>>> > scala> val query =
>>> > streamingCountsDF.writeStream.format("parquet").start()
>>> > java.lang.IllegalArgumentException: 'path' is not specified
>>> >   at
>>> >
>>> > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:264)
>>> >   at
>>> >
>>> > org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:264)
>>> >   at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>> >   at
>>> >
>>> > org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:117)
>>> >   at
>>> >
>>> > org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:263)
>>> >   at
>>> >
>>> > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
>>> >   ... 60 elided
>>> >
>>> > But, I don't see path or parquet in DataStreamWriter.
>>> >
>>> > scala> val query = streamingCountsDF.writeStream.
>>> > foreach   format   option   options   outputMode   partitionBy
>>> > queryName
>>> > start   trigger
>>> >
>>> > Any idea how to write this to parquet file?
>>> >
>>> > - Arun
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to