[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-07-19 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17383394#comment-17383394
 ] 

Apache Spark commented on SPARK-34806:
--

User 'EnricoMi' has created a pull request for this issue:
https://github.com/apache/spark/pull/33422

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Assignee: Enrico Minack
>Priority: Minor
> Fix For: 3.3.0
>
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-06-09 Thread Enrico Minack (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360325#comment-17360325
 ] 

Enrico Minack commented on SPARK-34806:
---

Thanks for the input [~cloud_fan] , I have moved the expressions into 
{{Dataset.observe()}} and restricted observations to be called only once with a 
Dataset.

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-05-11 Thread Wenchen Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342403#comment-17342403
 ] 

Wenchen Fan commented on SPARK-34806:
-

A few opinions:
 # Creating Observation without column expressions looks better, as it's weird 
to write column expressions without seeing the dataframe.
 # Observation should be dedicated to one df. `df1.observe(o, ...); 
df2.observe(o, ...);` should fail.

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-05-02 Thread Enrico Minack (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338112#comment-17338112
 ] 

Enrico Minack commented on SPARK-34806:
---

[~cloud_fan] [~kabhwan]  [~hvanhovell] [~hyukjin.kwon] Which of the above 
design options would you prefer? Do you have any other design suggestions?

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-03-31 Thread Enrico Minack (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312403#comment-17312403
 ] 

Enrico Minack commented on SPARK-34806:
---

The interaction with {{Observation}} can be split into three steps:
 # create an {{Observation}} instance
 # observe a {{Dataset}}
 # retrieve the metrics

Each of these could be designed in different ways:

*1. create an {{Observation}} instance*
{code:scala}
 // without column expressions
 val observation = Observation("name")
 val observation = spark.observation("name")

// with column expressions
 val observation = Observation("name", count(lit(1)), sum($"id"), mean($"id"))
 val observation = spark.observation("name", 
avg($"id").cast("int").as("avg_val"))
{code}
*2. observe a {{Dataset}}*
{code:scala}
 val observed = df.observe(observation)
 val observed = df.observe(observation, count(lit(1)), sum($"id"), mean($"id"))
 val observed = observation.on(ds)
 val observed = ds.transform(observation.on)
{code}
*3. retrieve the metrics*
{code:scala}
 // ways to retrieve the metrics
 val metrics: Row = observation.get
{code}
So we have these design decisions:
- Are column expressions constant to an observation?
-- I would prefer this: increases immutability of the observation instance
- Create an {{Observation}} through a session method or by simply instantiating 
class {{Observation}}?
-- I would prefer this: created instance can register right away with the 
session listeners, not on "2. observe a {{Dataset}}"
- Extend the {{Dataset}} API by overloading `observe`?
-- I would prefer: {{df.observe(observation)}} as it mimics existing 
{{df.observe(name, col, cols)}}, we could still have 
{{Observation.on(Dataset)}} and users can pick their favourite
- Move the logic that rejects stream datasets from `Dataset.observe` to 
{{Observation.on}}. As it is not the {{Dataset}} that rejects this construct, 
it is the {{Observation}}.
- Make {{Observation}} thread-safe?
-- I'd argue, the user of {{Observation}} has to make it thread safe by calling 
the actions on the observed {{Dataset}} in the same thread as calling 
{{Observation.get}}, or synchronizing the threads that do these bits. I would 
not expect it a general use case to have these two in different threads.
- Make the observation name optional
-- What is the use to give it a name? When optional, it could generate a random 
UUID name internally.

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (SPARK-34806) Helper class for batch Dataset.observe()

2021-03-20 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305379#comment-17305379
 ] 

Apache Spark commented on SPARK-34806:
--

User 'EnricoMi' has created a pull request for this issue:
https://github.com/apache/spark/pull/31905

> Helper class for batch Dataset.observe()
> 
>
> Key: SPARK-34806
> URL: https://issues.apache.org/jira/browse/SPARK-34806
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Enrico Minack
>Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org