Hi,

> If the data is very large then a collect may result in OOM.

That's a general case even in any part of Spark, incl. Spark Structured
Streaming. Why would you collect in addBatch? It's on the driver side and
as anything on the driver, it's a single JVM (and usually not fault
tolerant)

> Do you have any other suggestion/recommendation ?

What's wrong with the current solution? I don't think you should change how
you do things currently. You should just avoid collect on large datasets
(which you have to do anywhere in Spark).

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Thanks Tathagata for your answer.
>
> The reason I was asking about controlling data size is that the javadoc
> indicate you can use foreach or collect on the dataframe.  If the data is
> very large then a collect may result in OOM.
>
> From your answer it appears that the only way to control the size (in 2.2)
> would be control the trigger interval. However, in my case, I have to dedup
> the elements in one minute interval, which I am using a trigger interval
> and cannot reduce it.  Do you have any other suggestion/recommendation ?
>
> Also, do you have any timeline for the availability of DataSourceV2/Spark
> 2.3 ?
>
> Thanks again.
>
>
> On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> 1. It is all the result data in that trigger. Note that it takes a
> DataFrame which is a purely logical representation of data and has no
> association with partitions, etc. which are physical representations.
>
> 2. If you want to limit the amount of data that is processed in a trigger,
> then you should either control the trigger interval or use the rate limit
> options on sources that support it (e.g. for kafka, you can use the option
> "maxOffsetsPerTrigger", see the guide
> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
> ).
>
> Related note, these APIs are subject to change. In fact in the upcoming
> release 2.3, we are adding a DataSource V2 API for
> batch/microbatch-streaming/continuous-streaming sources and sinks.
>
> On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid>
> wrote:
>
> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>    * Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>    * this method is called more than once with the same batchId (which
> will happen in the case of
>    * failures), then `data` should only be added once.
>    *
>    * Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>    * Otherwise, you may get a wrong result.
>    *
>    * Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>    * after data is consumed by sink successfully.
>    */
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>
>
>
>
>

Reply via email to