Hi, > If the data is very large then a collect may result in OOM.
That's a general case even in any part of Spark, incl. Spark Structured Streaming. Why would you collect in addBatch? It's on the driver side and as anything on the driver, it's a single JVM (and usually not fault tolerant) > Do you have any other suggestion/recommendation ? What's wrong with the current solution? I don't think you should change how you do things currently. You should just avoid collect on large datasets (which you have to do anywhere in Spark). Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Thu, Jan 4, 2018 at 10:49 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Thanks Tathagata for your answer. > > The reason I was asking about controlling data size is that the javadoc > indicate you can use foreach or collect on the dataframe. If the data is > very large then a collect may result in OOM. > > From your answer it appears that the only way to control the size (in 2.2) > would be control the trigger interval. However, in my case, I have to dedup > the elements in one minute interval, which I am using a trigger interval > and cannot reduce it. Do you have any other suggestion/recommendation ? > > Also, do you have any timeline for the availability of DataSourceV2/Spark > 2.3 ? > > Thanks again. > > > On Wednesday, January 3, 2018 2:27 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > > > 1. It is all the result data in that trigger. Note that it takes a > DataFrame which is a purely logical representation of data and has no > association with partitions, etc. which are physical representations. > > 2. If you want to limit the amount of data that is processed in a trigger, > then you should either control the trigger interval or use the rate limit > options on sources that support it (e.g. for kafka, you can use the option > "maxOffsetsPerTrigger", see the guide > <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html> > ). > > Related note, these APIs are subject to change. In fact in the upcoming > release 2.3, we are adding a DataSource V2 API for > batch/microbatch-streaming/continuous-streaming sources and sinks. > > On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid> > wrote: > > Hi: > > The documentation for Sink.addBatch is as follows: > > /** > * Adds a batch of data to this sink. The data for a given `batchId` is > deterministic and if > * this method is called more than once with the same batchId (which > will happen in the case of > * failures), then `data` should only be added once. > * > * Note 1: You cannot apply any operators on `data` except consuming it > (e.g., `collect/foreach`). > * Otherwise, you may get a wrong result. > * > * Note 2: The method is supposed to be executed synchronously, i.e. > the method should only return > * after data is consumed by sink successfully. > */ > def addBatch(batchId: Long, data: DataFrame): Unit > > A few questions about the data is each DataFrame passed as the argument to > addBatch - > 1. Is it all the data in a partition for each trigger or is it all the > data in that trigger ? > 2. Is there a way to control the size in each addBatch invocation to make > sure that we don't run into OOM exception on the executor while calling > collect ? > > Thanks > > > > >