[ https://issues.apache.org/jira/browse/SPARK-34806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312403#comment-17312403 ]
Enrico Minack commented on SPARK-34806: --------------------------------------- The interaction with {{Observation}} can be split into three steps: # create an {{Observation}} instance # observe a {{Dataset}} # retrieve the metrics Each of these could be designed in different ways: *1. create an {{Observation}} instance* {code:scala} // without column expressions val observation = Observation("name") val observation = spark.observation("name") // with column expressions val observation = Observation("name", count(lit(1)), sum($"id"), mean($"id")) val observation = spark.observation("name", avg($"id").cast("int").as("avg_val")) {code} *2. observe a {{Dataset}}* {code:scala} val observed = df.observe(observation) val observed = df.observe(observation, count(lit(1)), sum($"id"), mean($"id")) val observed = observation.on(ds) val observed = ds.transform(observation.on) {code} *3. retrieve the metrics* {code:scala} // ways to retrieve the metrics val metrics: Row = observation.get {code} So we have these design decisions: - Are column expressions constant to an observation? -- I would prefer this: increases immutability of the observation instance - Create an {{Observation}} through a session method or by simply instantiating class {{Observation}}? -- I would prefer this: created instance can register right away with the session listeners, not on "2. observe a {{Dataset}}" - Extend the {{Dataset}} API by overloading `observe`? -- I would prefer: {{df.observe(observation)}} as it mimics existing {{df.observe(name, col, cols)}}, we could still have {{Observation.on(Dataset)}} and users can pick their favourite - Move the logic that rejects stream datasets from `Dataset.observe` to {{Observation.on}}. As it is not the {{Dataset}} that rejects this construct, it is the {{Observation}}. - Make {{Observation}} thread-safe? -- I'd argue, the user of {{Observation}} has to make it thread safe by calling the actions on the observed {{Dataset}} in the same thread as calling {{Observation.get}}, or synchronizing the threads that do these bits. I would not expect it a general use case to have these two in different threads. - Make the observation name optional -- What is the use to give it a name? When optional, it could generate a random UUID name internally. > Helper class for batch Dataset.observe() > ---------------------------------------- > > Key: SPARK-34806 > URL: https://issues.apache.org/jira/browse/SPARK-34806 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 3.2.0 > Reporter: Enrico Minack > Priority: Minor > > The {{observe}} method has been added to the {{Dataset}} API in 3.0.0. It > allows to collect aggregate metrics over data of a Dataset while they are > being processed during an action. > These metrics are collected in a separate thread after registering > {{QueryExecutionListener}} for batch datasets and {{StreamingQueryListener}} > for stream datasets, respectively. While in streaming context it makes > perfectly sense to process incremental metrics in an event-based fashion, for > simple batch datatset processing, a single result should be retrievable > without the need to register listeners or handling threading. > Introducing an {{Observation}} helper class can hide that complexity for > simple use-cases in batch processing. > Similar to {{AccumulatorV2}} provided by {{SparkContext}} (e.g. > {{SparkContext.LongAccumulator}}), the {{SparkSession}} can provide a method > to create a new {{Observation}} instance and register it with the session. > Alternatively, an {{Observation}} instance could be instantiated on its own > which on calling {{Observation.on(Dataset)}} registers with > {{Dataset.sparkSession}}. This "registration" registers a listener with the > session that retrieves the metrics. > The {{Observation}} class provides methods to retrieve the metrics. This > retrieval has to wait for the listener to be called in a separate thread. So > all methods will wait for this, optionally with a timeout: > - {{Observation.get}} waits without timeout and returns the metric. > - {{Observation.option(time, unit)}} waits at most {{time}}, returns the > metric as an {{Option}}, or {{None}} when the timeout occurs. > - {{Observation.waitCompleted(time, unit)}} waits for the metrics and > indicates timeout by returning {{false}}. > Obviously, an action has to be called on the observed dataset before any of > these methods are called, otherwise a timeout will occur. > With {{Observation.reset}}, another action can be observed. Finally, > {{Observation.close}} unregisters the listener from the session. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org