1. It is all the result data in that trigger. Note that it takes a
DataFrame which is a purely logical representation of data and has no
association with partitions, etc. which are physical representations.

2. If you want to limit the amount of data that is processed in a trigger,
then you should either control the trigger interval or use the rate limit
options on sources that support it (e.g. for kafka, you can use the option
"maxOffsetsPerTrigger", see the guide
<https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
).

Related note, these APIs are subject to change. In fact in the upcoming
release 2.3, we are adding a DataSource V2 API for
batch/microbatch-streaming/continuous-streaming sources and sinks.

On Wed, Jan 3, 2018 at 11:23 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>    * Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>    * this method is called more than once with the same batchId (which
> will happen in the case of
>    * failures), then `data` should only be added once.
>    *
>    * Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>    * Otherwise, you may get a wrong result.
>    *
>    * Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>    * after data is consumed by sink successfully.
>    */
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>

Reply via email to