Re: [SPARK-34806] Observable Metrics on Spark Datasets

2021-03-20 Thread Enrico Minack

The PR can be found here: https://github.com/apache/spark/pull/31905


Am 19.03.21 um 10:55 schrieb Enrico Minack:


I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
I think a listener-based API makes sense for streaming (since you 
need to keep watching the result), but may not be very reasonable for 
batch queries (you only get the result once). The idea of 
Observation looks good, but we should define what happens if 
`observation.get` is called before the batch query finishes.


Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> 
wrote:


Please follow up the discussion in the origin PR.
https://github.com/apache/spark/pull/26127


Dataset.observe() relies on the query listener for the batch
query which is an "unstable" API - that's why we decided to not
add an example for the batch query. For streaming query, it
relies on the streaming query listener which is a stable API.
That said, personally I'd consider the new API to be fit to the
streaming query first, and see whether it fits to the batch query
as well.

If we found Dataset.observe() to be useful enough on the batch
query, we'd probably be better to discuss how to provide these
metrics against a stable API (so that Scala users could leverage
it), and look back later for PySpark. That looks to be the first
one to do if we have a consensus on the usefulness of observable
metrics on batch query.


On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

I am focusing on batch mode, not streaming mode. I would
argue that Dataset.observe() is equally useful for large
batch processing. If you need some motivating use cases,
please let me know.

Anyhow, the documentation of observe states it works for
both, batch and streaming. And in batch mode, the helper
class Observation helps reducing code and avoiding repetition.

The PySpark implementation of the Observation class can
implement *all* methods by merely calling into their JVM
counterpart, where the locking, listening, registration and
un-registration happens. I think this qualifies as: "all the
logic happens in the JVM". All that is transferred to Python
is a row's data. No listeners needed.

Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:

If I remember correctly, the major audience of the "observe"
API is Structured Streaming, micro-batch mode. From the
example, the abstraction in 2 isn't something working with
Structured Streaming. It could be still done with callback,
but it remains the question how much complexity is hidden
from abstraction.

I see you're focusing on PySpark - I'm not sure whether
there's intention on not exposing query listener / streaming
query listener to PySpark, but if there's some valid reason
to do so, I'm not sure we do like to expose them to PySpark
in any way. 2 isn't making sense to me with PySpark - how do
you ensure all the logic is happening in the JVM and you can
leverage these values from PySpark?
(I see there's support for listeners with DStream in
PySpark, so there might be reasons not to add the same for
SQL/SS. Probably a lesson learned?)


On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

Hi Spark-Devs,

the observable metrics that have been added to the
Dataset API in 3.0.0 are a great improvement over the
Accumulator APIs that seem to have much weaker
guarantees. I have two questions regarding follow-up
contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent
in the Python API. Is this something planned to happen,
or is it missing because there are not listeners in
PySpark which renders this method useless in Python. I
would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register
listeners


with QueryExecutionListener or StreamingQUeryListener to
obtain the result. I think for simple setups, this could
be hidden behind a common helper class here called
Obse

Re: Observable Metrics on Spark Datasets

2021-03-19 Thread Enrico Minack

I'll sketch out a PR so we can talk code and move the discussion there.



Am 18.03.21 um 14:55 schrieb Wenchen Fan:
I think a listener-based API makes sense for streaming (since you need 
to keep watching the result), but may not be very reasonable for batch 
queries (you only get the result once). The idea of Observation looks 
good, but we should define what happens if `observation.get` is called 
before the batch query finishes.


Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
mailto:kabhwan.opensou...@gmail.com>> 
wrote:


Please follow up the discussion in the origin PR.
https://github.com/apache/spark/pull/26127


Dataset.observe() relies on the query listener for the batch query
which is an "unstable" API - that's why we decided to not add an
example for the batch query. For streaming query, it relies on the
streaming query listener which is a stable API. That said,
personally I'd consider the new API to be fit to the streaming
query first, and see whether it fits to the batch query as well.

If we found Dataset.observe() to be useful enough on the batch
query, we'd probably be better to discuss how to provide these
metrics against a stable API (so that Scala users could leverage
it), and look back later for PySpark. That looks to be the first
one to do if we have a consensus on the usefulness of observable
metrics on batch query.


On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

I am focusing on batch mode, not streaming mode. I would argue
that Dataset.observe() is equally useful for large batch
processing. If you need some motivating use cases, please let
me know.

Anyhow, the documentation of observe states it works for both,
batch and streaming. And in batch mode, the helper class
Observation helps reducing code and avoiding repetition.

The PySpark implementation of the Observation class can
implement *all* methods by merely calling into their JVM
counterpart, where the locking, listening, registration and
un-registration happens. I think this qualifies as: "all the
logic happens in the JVM". All that is transferred to Python
is a row's data. No listeners needed.

Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:

If I remember correctly, the major audience of the "observe"
API is Structured Streaming, micro-batch mode. From the
example, the abstraction in 2 isn't something working with
Structured Streaming. It could be still done with callback,
but it remains the question how much complexity is hidden
from abstraction.

I see you're focusing on PySpark - I'm not sure whether
there's intention on not exposing query listener / streaming
query listener to PySpark, but if there's some valid reason
to do so, I'm not sure we do like to expose them to PySpark
in any way. 2 isn't making sense to me with PySpark - how do
you ensure all the logic is happening in the JVM and you can
leverage these values from PySpark?
(I see there's support for listeners with DStream in PySpark,
so there might be reasons not to add the same for SQL/SS.
Probably a lesson learned?)


On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack
mailto:m...@enrico.minack.dev>> wrote:

Hi Spark-Devs,

the observable metrics that have been added to the
Dataset API in 3.0.0 are a great improvement over the
Accumulator APIs that seem to have much weaker
guarantees. I have two questions regarding follow-up
contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent
in the Python API. Is this something planned to happen,
or is it missing because there are not listeners in
PySpark which renders this method useless in Python. I
would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register
listeners


with QueryExecutionListener or StreamingQUeryListener to
obtain the result. I think for simple setups, this could
be hidden behind a common helper class here called
Observation, which reduces the usage of observe to three
lines of code:

// our Dataset (this does no

Re: Observable Metrics on Spark Datasets

2021-03-18 Thread Wenchen Fan
I think a listener-based API makes sense for streaming (since you need to
keep watching the result), but may not be very reasonable for batch queries
(you only get the result once). The idea of Observation looks good, but we
should define what happens if `observation.get` is called before the batch
query finishes.

Can we have a PR for it so that we can have more detailed discussions?

On Tue, Mar 16, 2021 at 3:59 PM Jungtaek Lim 
wrote:

> Please follow up the discussion in the origin PR.
> https://github.com/apache/spark/pull/26127
>
> Dataset.observe() relies on the query listener for the batch query which
> is an "unstable" API - that's why we decided to not add an example for the
> batch query. For streaming query, it relies on the streaming query listener
> which is a stable API. That said, personally I'd consider the new API to be
> fit to the streaming query first, and see whether it fits to the batch
> query as well.
>
> If we found Dataset.observe() to be useful enough on the batch query, we'd
> probably be better to discuss how to provide these metrics against a stable
> API (so that Scala users could leverage it), and look back later for
> PySpark. That looks to be the first one to do if we have a consensus on the
> usefulness of observable metrics on batch query.
>
>
> On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack 
> wrote:
>
>> I am focusing on batch mode, not streaming mode. I would argue that
>> Dataset.observe() is equally useful for large batch processing. If you
>> need some motivating use cases, please let me know.
>>
>> Anyhow, the documentation of observe states it works for both, batch and
>> streaming. And in batch mode, the helper class Observation helps
>> reducing code and avoiding repetition.
>>
>> The PySpark implementation of the Observation class can implement *all*
>> methods by merely calling into their JVM counterpart, where the locking,
>> listening, registration and un-registration happens. I think this qualifies
>> as: "all the logic happens in the JVM". All that is transferred to Python
>> is a row's data. No listeners needed.
>>
>> Enrico
>>
>>
>>
>> Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
>>
>> If I remember correctly, the major audience of the "observe" API is
>> Structured Streaming, micro-batch mode. From the example, the abstraction
>> in 2 isn't something working with Structured Streaming. It could be still
>> done with callback, but it remains the question how much complexity is
>> hidden from abstraction.
>>
>> I see you're focusing on PySpark - I'm not sure whether there's intention
>> on not exposing query listener / streaming query listener to PySpark, but
>> if there's some valid reason to do so, I'm not sure we do like to expose
>> them to PySpark in any way. 2 isn't making sense to me with PySpark - how
>> do you ensure all the logic is happening in the JVM and you can leverage
>> these values from PySpark?
>> (I see there's support for listeners with DStream in PySpark, so there
>> might be reasons not to add the same for SQL/SS. Probably a lesson learned?)
>>
>>
>> On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack 
>> wrote:
>>
>>> Hi Spark-Devs,
>>>
>>> the observable metrics that have been added to the Dataset API in 3.0.0
>>> are a great improvement over the Accumulator APIs that seem to have much
>>> weaker guarantees. I have two questions regarding follow-up contributions:
>>>
>>> *1. Add observe to Python **DataFrame*
>>>
>>> As I can see from master branch, there is no equivalent in the Python
>>> API. Is this something planned to happen, or is it missing because there
>>> are not listeners in PySpark which renders this method useless in Python. I
>>> would be happy to contribute here.
>>>
>>>
>>> *2. Add Observation class to simplify result access *
>>>
>>> The Dataset.observe method requires users to register listeners
>>> 
>>> with QueryExecutionListener or StreamingQUeryListener to obtain the
>>> result. I think for simple setups, this could be hidden behind a common
>>> helper class here called Observation, which reduces the usage of observe
>>> to three lines of code:
>>>
>>> // our Dataset (this does not count as a line of code)val df = Seq((1, 
>>> "a"), (2, "b"), (4, "c"), (8, "d")).toDF("id", "value")
>>> // define the observation we want to makeval observation = 
>>> Observation("stats", count($"id"), sum($"id"))
>>> // add the observation to the Dataset and execute an action on itval cnt = 
>>> df.observe(observation).count()
>>> // retrieve the resultassert(observation.get === Row(4, 15))
>>>
>>> The Observation class can handle the registration and de-registration
>>> of the listener, as well as properly accessing the result across thread
>>> boundaries.
>>>
>>> With *2.*, the observe method can be added to PySpark without
>>> introd

Re: Observable Metrics on Spark Datasets

2021-03-16 Thread Jungtaek Lim
Please follow up the discussion in the origin PR.
https://github.com/apache/spark/pull/26127

Dataset.observe() relies on the query listener for the batch query which is
an "unstable" API - that's why we decided to not add an example for the
batch query. For streaming query, it relies on the streaming query listener
which is a stable API. That said, personally I'd consider the new API to be
fit to the streaming query first, and see whether it fits to the batch
query as well.

If we found Dataset.observe() to be useful enough on the batch query, we'd
probably be better to discuss how to provide these metrics against a stable
API (so that Scala users could leverage it), and look back later for
PySpark. That looks to be the first one to do if we have a consensus on the
usefulness of observable metrics on batch query.


On Tue, Mar 16, 2021 at 4:17 PM Enrico Minack 
wrote:

> I am focusing on batch mode, not streaming mode. I would argue that
> Dataset.observe() is equally useful for large batch processing. If you
> need some motivating use cases, please let me know.
>
> Anyhow, the documentation of observe states it works for both, batch and
> streaming. And in batch mode, the helper class Observation helps reducing
> code and avoiding repetition.
>
> The PySpark implementation of the Observation class can implement *all*
> methods by merely calling into their JVM counterpart, where the locking,
> listening, registration and un-registration happens. I think this qualifies
> as: "all the logic happens in the JVM". All that is transferred to Python
> is a row's data. No listeners needed.
>
> Enrico
>
>
>
> Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
>
> If I remember correctly, the major audience of the "observe" API is
> Structured Streaming, micro-batch mode. From the example, the abstraction
> in 2 isn't something working with Structured Streaming. It could be still
> done with callback, but it remains the question how much complexity is
> hidden from abstraction.
>
> I see you're focusing on PySpark - I'm not sure whether there's intention
> on not exposing query listener / streaming query listener to PySpark, but
> if there's some valid reason to do so, I'm not sure we do like to expose
> them to PySpark in any way. 2 isn't making sense to me with PySpark - how
> do you ensure all the logic is happening in the JVM and you can leverage
> these values from PySpark?
> (I see there's support for listeners with DStream in PySpark, so there
> might be reasons not to add the same for SQL/SS. Probably a lesson learned?)
>
>
> On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack 
> wrote:
>
>> Hi Spark-Devs,
>>
>> the observable metrics that have been added to the Dataset API in 3.0.0
>> are a great improvement over the Accumulator APIs that seem to have much
>> weaker guarantees. I have two questions regarding follow-up contributions:
>>
>> *1. Add observe to Python **DataFrame*
>>
>> As I can see from master branch, there is no equivalent in the Python
>> API. Is this something planned to happen, or is it missing because there
>> are not listeners in PySpark which renders this method useless in Python. I
>> would be happy to contribute here.
>>
>>
>> *2. Add Observation class to simplify result access *
>>
>> The Dataset.observe method requires users to register listeners
>> 
>> with QueryExecutionListener or StreamingQUeryListener to obtain the
>> result. I think for simple setups, this could be hidden behind a common
>> helper class here called Observation, which reduces the usage of observe
>> to three lines of code:
>>
>> // our Dataset (this does not count as a line of code)val df = Seq((1, "a"), 
>> (2, "b"), (4, "c"), (8, "d")).toDF("id", "value")
>> // define the observation we want to makeval observation = 
>> Observation("stats", count($"id"), sum($"id"))
>> // add the observation to the Dataset and execute an action on itval cnt = 
>> df.observe(observation).count()
>> // retrieve the resultassert(observation.get === Row(4, 15))
>>
>> The Observation class can handle the registration and de-registration of
>> the listener, as well as properly accessing the result across thread
>> boundaries.
>>
>> With *2.*, the observe method can be added to PySpark without
>> introducing listeners there at all. All the logic is happening in the JVM.
>>
>> Thanks for your thoughts on this.
>>
>> Regards,
>> Enrico
>>
>


Re: Observable Metrics on Spark Datasets

2021-03-16 Thread Enrico Minack
I am focusing on batch mode, not streaming mode. I would argue that 
Dataset.observe() is equally useful for large batch processing. If you 
need some motivating use cases, please let me know.


Anyhow, the documentation of observe states it works for both, batch and 
streaming. And in batch mode, the helper class Observation helps 
reducing code and avoiding repetition.


The PySpark implementation of the Observation class can implement *all* 
methods by merely calling into their JVM counterpart, where the locking, 
listening, registration and un-registration happens. I think this 
qualifies as: "all the logic happens in the JVM". All that is 
transferred to Python is a row's data. No listeners needed.


Enrico



Am 16.03.21 um 00:13 schrieb Jungtaek Lim:
If I remember correctly, the major audience of the "observe" API is 
Structured Streaming, micro-batch mode. From the example, the 
abstraction in 2 isn't something working with Structured Streaming. It 
could be still done with callback, but it remains the question how 
much complexity is hidden from abstraction.


I see you're focusing on PySpark - I'm not sure whether there's 
intention on not exposing query listener / streaming query listener to 
PySpark, but if there's some valid reason to do so, I'm not sure we do 
like to expose them to PySpark in any way. 2 isn't making sense to me 
with PySpark - how do you ensure all the logic is happening in the JVM 
and you can leverage these values from PySpark?
(I see there's support for listeners with DStream in PySpark, so there 
might be reasons not to add the same for SQL/SS. Probably a lesson 
learned?)



On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack > wrote:


Hi Spark-Devs,

the observable metrics that have been added to the Dataset API in
3.0.0 are a great improvement over the Accumulator APIs that seem
to have much weaker guarantees. I have two questions regarding
follow-up contributions:

*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent in the
Python API. Is this something planned to happen, or is it missing
because there are not listeners in PySpark which renders this
method useless in Python. I would be happy to contribute here.

*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register listeners


with QueryExecutionListener or StreamingQUeryListener to obtain
the result. I think for simple setups, this could be hidden behind
a common helper class here called Observation, which reduces the
usage of observe to three lines of code:

// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8, 
"d")).toDF("id", "value")

// define the observation we want to make val observation =Observation("stats", 
count($"id"), sum($"id"))

// add the observation to the Dataset and execute an action on it
val cnt = df.observe(observation).count()

// retrieve the result assert(observation.get ===Row(4, 15))

The Observation class can handle the registration and
de-registration of the listener, as well as properly accessing the
result across thread boundaries.

With *2.*, the observe method can be added to PySpark without
introducing listeners there at all. All the logic is happening in
the JVM.

Thanks for your thoughts on this.

Regards,
Enrico



Re: Observable Metrics on Spark Datasets

2021-03-15 Thread Jungtaek Lim
If I remember correctly, the major audience of the "observe" API is
Structured Streaming, micro-batch mode. From the example, the abstraction
in 2 isn't something working with Structured Streaming. It could be still
done with callback, but it remains the question how much complexity is
hidden from abstraction.

I see you're focusing on PySpark - I'm not sure whether there's intention
on not exposing query listener / streaming query listener to PySpark, but
if there's some valid reason to do so, I'm not sure we do like to expose
them to PySpark in any way. 2 isn't making sense to me with PySpark - how
do you ensure all the logic is happening in the JVM and you can leverage
these values from PySpark?
(I see there's support for listeners with DStream in PySpark, so there
might be reasons not to add the same for SQL/SS. Probably a lesson learned?)


On Mon, Mar 15, 2021 at 6:59 PM Enrico Minack 
wrote:

> Hi Spark-Devs,
>
> the observable metrics that have been added to the Dataset API in 3.0.0
> are a great improvement over the Accumulator APIs that seem to have much
> weaker guarantees. I have two questions regarding follow-up contributions:
>
> *1. Add observe to Python **DataFrame*
>
> As I can see from master branch, there is no equivalent in the Python API.
> Is this something planned to happen, or is it missing because there are not
> listeners in PySpark which renders this method useless in Python. I would
> be happy to contribute here.
>
>
> *2. Add Observation class to simplify result access *
>
> The Dataset.observe method requires users to register listeners
> 
> with QueryExecutionListener or StreamingQUeryListener to obtain the
> result. I think for simple setups, this could be hidden behind a common
> helper class here called Observation, which reduces the usage of observe
> to three lines of code:
>
> // our Dataset (this does not count as a line of code)val df = Seq((1, "a"), 
> (2, "b"), (4, "c"), (8, "d")).toDF("id", "value")
> // define the observation we want to makeval observation = 
> Observation("stats", count($"id"), sum($"id"))
> // add the observation to the Dataset and execute an action on itval cnt = 
> df.observe(observation).count()
> // retrieve the resultassert(observation.get === Row(4, 15))
>
> The Observation class can handle the registration and de-registration of
> the listener, as well as properly accessing the result across thread
> boundaries.
>
> With *2.*, the observe method can be added to PySpark without introducing
> listeners there at all. All the logic is happening in the JVM.
>
> Thanks for your thoughts on this.
>
> Regards,
> Enrico
>


Observable Metrics on Spark Datasets

2021-03-15 Thread Enrico Minack

Hi Spark-Devs,

the observable metrics that have been added to the Dataset API in 3.0.0 
are a great improvement over the Accumulator APIs that seem to have much 
weaker guarantees. I have two questions regarding follow-up contributions:


*1. Add observe to Python ***DataFrame**

As I can see from master branch, there is no equivalent in the Python 
API. Is this something planned to happen, or is it missing because there 
are not listeners in PySpark which renders this method useless in 
Python. I would be happy to contribute here.


*2. Add Observation class to simplify result access
*

The Dataset.observe method requires users to register listeners 
 
with QueryExecutionListener or StreamingQUeryListener to obtain the 
result. I think for simple setups, this could be hidden behind a common 
helper class here called Observation, which reduces the usage of observe 
to three lines of code:


// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8, 
"d")).toDF("id", "value")

// define the observation we want to make val observation =Observation("stats", 
count($"id"), sum($"id"))

// add the observation to the Dataset and execute an action on it val cnt = 
df.observe(observation).count()

// retrieve the result assert(observation.get ===Row(4, 15))

The Observation class can handle the registration and de-registration of 
the listener, as well as properly accessing the result across thread 
boundaries.


With *2.*, the observe method can be added to PySpark without 
introducing listeners there at all. All the logic is happening in the JVM.


Thanks for your thoughts on this.

Regards,
Enrico