[ 
https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312403#comment-17312403
 ] 

Enrico Minack commented on SPARK-34806:
---------------------------------------

The interaction with {{Observation}} can be split into three steps:
 # create an {{Observation}} instance
 # observe a {{Dataset}}
 # retrieve the metrics

Each of these could be designed in different ways:

*1. create an {{Observation}} instance*
{code:scala}
 // without column expressions
 val observation = Observation("name")
 val observation = spark.observation("name")

// with column expressions
 val observation = Observation("name", count(lit(1)), sum($"id"), mean($"id"))
 val observation = spark.observation("name", 
avg($"id").cast("int").as("avg_val"))
{code}
*2. observe a {{Dataset}}*
{code:scala}
 val observed = df.observe(observation)
 val observed = df.observe(observation, count(lit(1)), sum($"id"), mean($"id"))
 val observed = observation.on(ds)
 val observed = ds.transform(observation.on)
{code}
*3. retrieve the metrics*
{code:scala}
 // ways to retrieve the metrics
 val metrics: Row = observation.get
{code}
So we have these design decisions:
- Are column expressions constant to an observation?
-- I would prefer this: increases immutability of the observation instance
- Create an {{Observation}} through a session method or by simply instantiating 
class {{Observation}}?
-- I would prefer this: created instance can register right away with the 
session listeners, not on "2. observe a {{Dataset}}"
- Extend the {{Dataset}} API by overloading `observe`?
-- I would prefer: {{df.observe(observation)}} as it mimics existing 
{{df.observe(name, col, cols)}}, we could still have 
{{Observation.on(Dataset)}} and users can pick their favourite
- Move the logic that rejects stream datasets from `Dataset.observe` to 
{{Observation.on}}. As it is not the {{Dataset}} that rejects this construct, 
it is the {{Observation}}.
- Make {{Observation}} thread-safe?
-- I'd argue, the user of {{Observation}} has to make it thread safe by calling 
the actions on the observed {{Dataset}} in the same thread as calling 
{{Observation.get}}, or synchronizing the threads that do these bits. I would 
not expect it a general use case to have these two in different threads.
- Make the observation name optional
-- What is the use to give it a name? When optional, it could generate a random 
UUID name internally.

> Helper class for batch Dataset.observe()
> ----------------------------------------
>
>                 Key: SPARK-34806
>                 URL: https://issues.apache.org/jira/browse/SPARK-34806
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Enrico Minack
>            Priority: Minor
>
> The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It 
> allows to collect aggregate metrics over data of a Dataset while they are 
> being processed during an action.
> These metrics are collected in a separate thread after registering 
> {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} 
> for stream datasets, respectively. While in streaming context it makes 
> perfectly sense to process incremental metrics in an event-based fashion, for 
> simple batch datatset processing, a single result should be retrievable 
> without the need to register listeners or handling threading.
> Introducing an {{Observation}} helper class can hide that complexity for 
> simple use-cases in batch processing.
> Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. 
> {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method 
> to create a new {{Observation}} instance and register it with the session.
> Alternatively, an {{Observation}} instance could be instantiated on its own 
> which on calling {{Observation.on(Dataset)}} registers with 
> {{Dataset.sparkSession}}. This "registration" registers a listener with the 
> session that retrieves the metrics.
> The {{Observation}} class provides methods to retrieve the metrics. This 
> retrieval has to wait for the listener to be called in a separate thread. So 
> all methods will wait for this, optionally with a timeout:
>  - {{Observation.get}} waits without timeout and returns the metric.
>  - {{Observation.option(time, unit)}} waits at most {{time}}, returns the 
> metric as an {{Option}}, or {{None}} when the timeout occurs.
>  - {{Observation.waitCompleted(time, unit)}} waits for the metrics and 
> indicates timeout by returning {{false}}.
> Obviously, an action has to be called on the observed dataset before any of 
> these methods are called, otherwise a timeout will occur.
> With {{Observation.reset}}, another action can be observed. Finally, 
> {{Observation.close}} unregisters the listener from the session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to