Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-05 Thread M Singh
Hi Jacek:

The javadoc mentions that we can only consume data from the data frame in the 
addBatch method.  So, if I would like to save the data to a new sink then I 
believe that I will need to collect the data and then save it.  This is the 
reason I am asking about how to control the size of the data in each invocation 
of the addBatch method.  Let me know if I am interpreting the javadoc 
incorrectly.  Here it is:
/**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit


Thanks
Mans  

On Thursday, January 4, 2018 2:19 PM, Jacek Laskowski  
wrote:
 

 Hi,
> If the data is very large then a collect may result in OOM.
That's a general case even in any part of Spark, incl. Spark Structured 
Streaming. Why would you collect in addBatch? It's on the driver side and as 
anything on the driver, it's a single JVM (and usually not fault tolerant)
> Do you have any other suggestion/recommendation ?
What's wrong with the current solution? I don't think you should change how you 
do things currently. You should just avoid collect on large datasets (which you 
have to do anywhere in Spark).
Pozdrawiam,Jacek Laskowskihttps://about.me/JacekLaskowskiMastering Spark 
SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh  wrote:

Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc 
indicate you can use foreach or collect on the dataframe.  If the data is very 
large then a collect may result in OOM.
>From your answer it appears that the only way to control the size (in 2.2) 
>would be control the trigger interval. However, in my case, I have to dedup 
>the elements in one minute interval, which I am using a trigger interval and 
>cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

On Wednesday, January 3, 2018 2:27 PM, Tathagata Das 
 wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame 
which is a purely logical representation of data and has no association with 
partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then 
you should either control the trigger interval or use the rate limit options on 
sources that support it (e.g. for kafka, you can use the option 
"maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 
2.3, we are adding a DataSource V2 API for batch/microbatch-streaming/ 
continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh  wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks



   



   

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-04 Thread Jacek Laskowski
Hi,

> If the data is very large then a collect may result in OOM.

That's a general case even in any part of Spark, incl. Spark Structured
Streaming. Why would you collect in addBatch? It's on the driver side and
as anything on the driver, it's a single JVM (and usually not fault
tolerant)

> Do you have any other suggestion/recommendation ?

What's wrong with the current solution? I don't think you should change how
you do things currently. You should just avoid collect on large datasets
(which you have to do anywhere in Spark).

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, Jan 4, 2018 at 10:49 PM, M Singh 
wrote:

> Thanks Tathagata for your answer.
>
> The reason I was asking about controlling data size is that the javadoc
> indicate you can use foreach or collect on the dataframe.  If the data is
> very large then a collect may result in OOM.
>
> From your answer it appears that the only way to control the size (in 2.2)
> would be control the trigger interval. However, in my case, I have to dedup
> the elements in one minute interval, which I am using a trigger interval
> and cannot reduce it.  Do you have any other suggestion/recommendation ?
>
> Also, do you have any timeline for the availability of DataSourceV2/Spark
> 2.3 ?
>
> Thanks again.
>
>
> On Wednesday, January 3, 2018 2:27 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>
> 1. It is all the result data in that trigger. Note that it takes a
> DataFrame which is a purely logical representation of data and has no
> association with partitions, etc. which are physical representations.
>
> 2. If you want to limit the amount of data that is processed in a trigger,
> then you should either control the trigger interval or use the rate limit
> options on sources that support it (e.g. for kafka, you can use the option
> "maxOffsetsPerTrigger", see the guide
> 
> ).
>
> Related note, these APIs are subject to change. In fact in the upcoming
> release 2.3, we are adding a DataSource V2 API for
> batch/microbatch-streaming/continuous-streaming sources and sinks.
>
> On Wed, Jan 3, 2018 at 11:23 PM, M Singh 
> wrote:
>
> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>* Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>* this method is called more than once with the same batchId (which
> will happen in the case of
>* failures), then `data` should only be added once.
>*
>* Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>* Otherwise, you may get a wrong result.
>*
>* Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>* after data is consumed by sink successfully.
>*/
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>
>
>
>
>


Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-04 Thread M Singh
Thanks Tathagata for your answer.
The reason I was asking about controlling data size is that the javadoc 
indicate you can use foreach or collect on the dataframe.  If the data is very 
large then a collect may result in OOM.
>From your answer it appears that the only way to control the size (in 2.2) 
>would be control the trigger interval. However, in my case, I have to dedup 
>the elements in one minute interval, which I am using a trigger interval and 
>cannot reduce it.  Do you have any other suggestion/recommendation ?
Also, do you have any timeline for the availability of DataSourceV2/Spark 2.3 ?
Thanks again. 

On Wednesday, January 3, 2018 2:27 PM, Tathagata Das 
 wrote:
 

 1. It is all the result data in that trigger. Note that it takes a DataFrame 
which is a purely logical representation of data and has no association with 
partitions, etc. which are physical representations.
2. If you want to limit the amount of data that is processed in a trigger, then 
you should either control the trigger interval or use the rate limit options on 
sources that support it (e.g. for kafka, you can use the option 
"maxOffsetsPerTrigger", see the guide).
Related note, these APIs are subject to change. In fact in the upcoming release 
2.3, we are adding a DataSource V2 API for 
batch/microbatch-streaming/continuous-streaming sources and sinks.
On Wed, Jan 3, 2018 at 11:23 PM, M Singh  wrote:

Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks



   

Re: Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread Tathagata Das
1. It is all the result data in that trigger. Note that it takes a
DataFrame which is a purely logical representation of data and has no
association with partitions, etc. which are physical representations.

2. If you want to limit the amount of data that is processed in a trigger,
then you should either control the trigger interval or use the rate limit
options on sources that support it (e.g. for kafka, you can use the option
"maxOffsetsPerTrigger", see the guide

).

Related note, these APIs are subject to change. In fact in the upcoming
release 2.3, we are adding a DataSource V2 API for
batch/microbatch-streaming/continuous-streaming sources and sinks.

On Wed, Jan 3, 2018 at 11:23 PM, M Singh 
wrote:

> Hi:
>
> The documentation for Sink.addBatch is as follows:
>
>   /**
>* Adds a batch of data to this sink. The data for a given `batchId` is
> deterministic and if
>* this method is called more than once with the same batchId (which
> will happen in the case of
>* failures), then `data` should only be added once.
>*
>* Note 1: You cannot apply any operators on `data` except consuming it
> (e.g., `collect/foreach`).
>* Otherwise, you may get a wrong result.
>*
>* Note 2: The method is supposed to be executed synchronously, i.e.
> the method should only return
>* after data is consumed by sink successfully.
>*/
>   def addBatch(batchId: Long, data: DataFrame): Unit
>
> A few questions about the data is each DataFrame passed as the argument to
> addBatch -
> 1. Is it all the data in a partition for each trigger or is it all the
> data in that trigger ?
> 2. Is there a way to control the size in each addBatch invocation to make
> sure that we don't run into OOM exception on the executor while calling
> collect ?
>
> Thanks
>


Apache Spark - Question about Structured Streaming Sink addBatch dataframe size

2018-01-03 Thread M Singh
Hi:
The documentation for Sink.addBatch is as follows:
  /**   * Adds a batch of data to this sink. The data for a given `batchId` is 
deterministic and if   * this method is called more than once with the same 
batchId (which will happen in the case of   * failures), then `data` should 
only be added once.   *   * Note 1: You cannot apply any operators on `data` 
except consuming it (e.g., `collect/foreach`).   * Otherwise, you may get a 
wrong result.   *   * Note 2: The method is supposed to be executed 
synchronously, i.e. the method should only return   * after data is consumed by 
sink successfully.   */  def addBatch(batchId: Long, data: DataFrame): Unit
A few questions about the data is each DataFrame passed as the argument to 
addBatch - 1. Is it all the data in a partition for each trigger or is it all 
the data in that trigger ?  2. Is there a way to control the size in each 
addBatch invocation to make sure that we don't run into OOM exception on the 
executor while calling collect ?
Thanks

Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
>
> 1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header
> that contains the 'schema' for the data, each log http/dns/etc will have
> different columns with different data types. So would I create a specific
> CSV reader inherited from the general one?  Also I'm assuming this would
> need to be in Scala/Java? (I suck at both of those :)
>

This is a good question. What I have seen others do is actually run
different streams for the different log types.  This way you can customize
the schema to the specific log type.

Even without using Scala/Java you could also use the text data source
(assuming the logs are new line delimited) and then write the parser for
each line in python.  There will be a performance penalty here though.


> 2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
> and handle log rotations?
>

The file based sources work by tracking which files have been processed and
then scanning (optionally using glob patterns) for new files.  There a two
assumptions here: files are immutable when they arrive and files always
have a unique name. If files are deleted, we ignore that, so you are okay
to rotate them out.

The full pipeline that I have seen often involves the logs getting uploaded
to something like S3.  This is nice because you get atomic visibility of
files that have already been rotated.  So I wouldn't really call this
dynamically tailing, but we do support looking for new files at some
location.


Re: Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
I can see your point that you don't really want an external process being
used for the streaming data sourceOkay so on the CSV/TSV front, I have
two follow up questions:

1) Parsing data/Schema creation: The Bro IDS logs have a 8 line header that
contains the 'schema' for the data, each log http/dns/etc will have
different columns with different data types. So would I create a specific
CSV reader inherited from the general one?  Also I'm assuming this would
need to be in Scala/Java? (I suck at both of those :)

2) Dynamic Tailing: Does the CSV/TSV data sources support dynamic tailing
and handle log rotations?

Thanks and BTW your Spark Summit talks are really well done and
informative. You're an excellent speaker.

-Brian

On Tue, Aug 8, 2017 at 2:09 PM, Michael Armbrust 
wrote:

> Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
> read bro logs, rather than a python library.  This is likely to have much
> better performance since we can do all of the parsing on the JVM without
> having to flow it though an external python process.
>
> On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie 
> wrote:
>
>> Hi All,
>>
>> I've read the new information about Structured Streaming in Spark, looks
>> super great.
>>
>> Resources that I've looked at
>> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
>> - https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
>> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Stru
>> ctured%20Streaming%20using%20Python%20DataFrames%20API.html
>>
>> + YouTube videos from Spark Summit 2016/2017
>>
>> So finally getting to my question:
>>
>> I have Python code that yields a Python generator... this is a great
>> streaming approach within Python. I've used it for network packet
>> processing and a bunch of other stuff. I'd love to simply hook up this
>> generator (that yields python dictionaries) along with a schema definition
>> to create an  'unbounded DataFrame' as discussed in
>> https://databricks.com/blog/2016/07/28/structured-streaming-
>> in-apache-spark.html
>>
>> Possible approaches:
>> - Make a custom receiver in Python: https://spark.apache.o
>> rg/docs/latest/streaming-custom-receivers.html
>> - Use Kafka (this is definitely possible and good but overkill for my use
>> case)
>> - Send data out a socket and use socketTextStream to pull back in (seems
>> a bit silly to me)
>> - Other???
>>
>> Since Python Generators so naturally fit into streaming pipelines I'd
>> think that this would be straightforward to 'couple' a python generator
>> into a Spark structured streaming pipeline..
>>
>> I've put together a small notebook just to give a concrete example
>> (streaming Bro IDS network data) https://github.com/Kitwa
>> re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>>
>> Any thoughts/suggestions/pointers are greatly appreciated.
>>
>> -Brian
>>
>>
>


Re: Question about 'Structured Streaming'

2017-08-08 Thread Michael Armbrust
Cool stuff! A pattern I have seen is to use our CSV/TSV or JSON support to
read bro logs, rather than a python library.  This is likely to have much
better performance since we can do all of the parsing on the JVM without
having to flow it though an external python process.

On Tue, Aug 8, 2017 at 9:35 AM, Brian Wylie  wrote:

> Hi All,
>
> I've read the new information about Structured Streaming in Spark, looks
> super great.
>
> Resources that I've looked at
> - https://spark.apache.org/docs/latest/streaming-programming-guide.html
> - https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
> - https://spark.apache.org/docs/latest/streaming-custom-receivers.html
> - http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/
> Structured%20Streaming%20using%20Python%20DataFrames%20API.html
>
> + YouTube videos from Spark Summit 2016/2017
>
> So finally getting to my question:
>
> I have Python code that yields a Python generator... this is a great
> streaming approach within Python. I've used it for network packet
> processing and a bunch of other stuff. I'd love to simply hook up this
> generator (that yields python dictionaries) along with a schema definition
> to create an  'unbounded DataFrame' as discussed in
> https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
>
> Possible approaches:
> - Make a custom receiver in Python: https://spark.apache.
> org/docs/latest/streaming-custom-receivers.html
> - Use Kafka (this is definitely possible and good but overkill for my use
> case)
> - Send data out a socket and use socketTextStream to pull back in (seems a
> bit silly to me)
> - Other???
>
> Since Python Generators so naturally fit into streaming pipelines I'd
> think that this would be straightforward to 'couple' a python generator
> into a Spark structured streaming pipeline..
>
> I've put together a small notebook just to give a concrete example
> (streaming Bro IDS network data) https://github.com/
> Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb
>
> Any thoughts/suggestions/pointers are greatly appreciated.
>
> -Brian
>
>


Fwd: Python question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/structured-streamin
g-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.
0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in https://databricks.com/
blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python: https://spark.apache.o
rg/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data) https://github.com/Kitwa
re/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian


Question about 'Structured Streaming'

2017-08-08 Thread Brian Wylie
Hi All,

I've read the new information about Structured Streaming in Spark, looks
super great.

Resources that I've looked at
- https://spark.apache.org/docs/latest/streaming-programming-guide.html
-
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
- https://spark.apache.org/docs/latest/streaming-custom-receivers.html
-
http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html

+ YouTube videos from Spark Summit 2016/2017

So finally getting to my question:

I have Python code that yields a Python generator... this is a great
streaming approach within Python. I've used it for network packet
processing and a bunch of other stuff. I'd love to simply hook up this
generator (that yields python dictionaries) along with a schema definition
to create an  'unbounded DataFrame' as discussed in
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

Possible approaches:
- Make a custom receiver in Python:
https://spark.apache.org/docs/latest/streaming-custom-receivers.html
- Use Kafka (this is definitely possible and good but overkill for my use
case)
- Send data out a socket and use socketTextStream to pull back in (seems a
bit silly to me)
- Other???

Since Python Generators so naturally fit into streaming pipelines I'd think
that this would be straightforward to 'couple' a python generator into a
Spark structured streaming pipeline..

I've put together a small notebook just to give a concrete example
(streaming Bro IDS network data)
https://github.com/Kitware/BroThon/blob/master/notebooks/Bro_IDS_to_Spark.ipynb

Any thoughts/suggestions/pointers are greatly appreciated.

-Brian