Hi Spark-Devs,
the observable metrics that have been added to the Dataset API in 3.0.0
are a great improvement over the Accumulator APIs that seem to have much
weaker guarantees. I have two questions regarding follow-up contributions:
*1. Add observe to Python ***DataFrame**
As I can see from master branch, there is no equivalent in the Python
API. Is this something planned to happen, or is it missing because there
are not listeners in PySpark which renders this method useless in
Python. I would be happy to contribute here.
*2. Add Observation class to simplify result access
*
The Dataset.observe method requires users to register listeners
<https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#observe(name:String,expr:org.apache.spark.sql.Column,exprs:org.apache.spark.sql.Column*):org.apache.spark.sql.Dataset[T]>
with QueryExecutionListener or StreamingQUeryListener to obtain the
result. I think for simple setups, this could be hidden behind a common
helper class here called Observation, which reduces the usage of observe
to three lines of code:
// our Dataset (this does not count as a line of code) val df =Seq((1, "a"), (2, "b"), (4, "c"), (8,
"d")).toDF("id", "value")
// define the observation we want to make val observation =Observation("stats",
count($"id"), sum($"id"))
// add the observation to the Dataset and execute an action on it val cnt =
df.observe(observation).count()
// retrieve the result assert(observation.get ===Row(4, 15))
The Observation class can handle the registration and de-registration of
the listener, as well as properly accessing the result across thread
boundaries.
With *2.*, the observe method can be added to PySpark without
introducing listeners there at all. All the logic is happening in the JVM.
Thanks for your thoughts on this.
Regards,
Enrico